From b15fa09acd957cf62a2a1057cfb9784bd9b4c4fe Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 6 Sep 2022 20:11:50 +0800 Subject: [PATCH] enh: qinf process optimization for duplicated commit version --- source/dnode/vnode/src/inc/sma.h | 1 + source/dnode/vnode/src/sma/smaCommit.c | 21 ++++++++++++++++----- source/dnode/vnode/src/sma/smaFS.c | 12 ++++++++++++ source/dnode/vnode/src/sma/smaRollup.c | 17 ++++++++++++----- 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index c9d1c3d015..165f5da4b6 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -211,6 +211,7 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version); void tdRSmaFSClose(SRSmaFS *fs); int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version); void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version); +int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat); int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index fd300ef34a..f08cefdb04 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -182,6 +182,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { SVnode *pVnode = pSma->pVnode; SRSmaFS *pFS = RSMA_FS(pStat); int64_t committed = pStat->commitAppliedVer; + int64_t fsMaxVer = -1; char qTaskInfoFullName[TSDB_FILENAME_LEN]; taosWLockLatch(RSMA_FS_LOCK(pStat)); @@ -204,10 +205,20 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { ++i; } - SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0}; - if (tdRSmaFSUpsertQTaskFile(pFS, &qFile) < 0) { - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); - return TSDB_CODE_FAILED; + if (taosArrayGetSize(pFS->aQTaskInf) > 0) { + fsMaxVer = ((SQTaskFile *)taosArrayGetLast(pFS->aQTaskInf))->version; + } + + if (fsMaxVer < committed) { + SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0}; + if (taosArrayPush(pFS->aQTaskInf, &qFile) < 0) { + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + } else { + smaDebug("vgId:%d, update qinf, no need as committed %" PRIi64 " not larger than fsMaxVer %" PRIi64, TD_VID(pVnode), + committed, fsMaxVer); } taosWUnLockLatch(RSMA_FS_LOCK(pStat)); @@ -365,7 +376,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); // step 1: merge qTaskInfo and iQTaskInfo // lock diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index 9c3a0a0b9e..8e8611f0e8 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -96,6 +96,18 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) { return oldVal; } +int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) { + SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; + int64_t version = -1; + + taosRLockLatch(RSMA_FS_LOCK(pStat)); + if (taosArrayGetSize(aQTaskInf) > 0) { + version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version; + } + taosRUnLockLatch(RSMA_FS_LOCK(pStat)); + return version; +} + void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) { SVnode *pVnode = pSma->pVnode; SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 14bf479533..920d04bf6c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1453,17 +1453,24 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { return TSDB_CODE_SUCCESS; } + int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat); + if (pRSmaStat->commitAppliedVer <= fsMaxVer) { + smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid, + pRSmaStat->commitAppliedVer, fsMaxVer); + return TSDB_CODE_SUCCESS; + } + STFile tFile = {0}; #if 0 if (pRSmaStat->commitAppliedVer > 0) { char qTaskInfoFName[TSDB_FILENAME_LEN]; tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { - smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); + smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; } if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) { - smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); + smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); goto _err; } smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile)); @@ -1513,11 +1520,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { char qTaskInfoFName[TSDB_FILENAME_LEN]; tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { - smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); + smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; } if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) { - smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); + smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); goto _err; } smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid, @@ -1561,7 +1568,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } return TSDB_CODE_SUCCESS; _err: - smaError("vgId:%d, rsma persit failed since %s", vid, terrstr()); + smaError("vgId:%d, rsma persist failed since %s", vid, terrstr()); if (isFileCreated) { tdRemoveTFile(&tFile); tdDestroyTFile(&tFile);