From 274d310f333bfefd1cfdd38ec381504b99a46a11 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 19 Sep 2022 17:36:51 +0800 Subject: [PATCH] feat: retention should not block data commit --- source/dnode/vnode/CMakeLists.txt | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 5 + source/dnode/vnode/src/tsdb/tsdbFS.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRetention.c | 24 +++- source/dnode/vnode/src/tsdb/tsdbRetention2.c | 140 +++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeOpen.c | 12 ++ source/dnode/vnode/src/vnd/vnodeSvr.c | 83 ++++++++++- 7 files changed, 262 insertions(+), 6 deletions(-) create mode 100644 source/dnode/vnode/src/tsdb/tsdbRetention2.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 7a99d26683..092c861766 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -49,7 +49,7 @@ target_sources( "src/tsdb/tsdbUtil.c" "src/tsdb/tsdbSnapshot.c" "src/tsdb/tsdbCacheRead.c" - "src/tsdb/tsdbRetention.c" + "src/tsdb/tsdbRetention2.c" "src/tsdb/tsdbDiskData.c" "src/tsdb/tsdbCompress.c" "src/tsdb/tsdbCompact.c" diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0e85e7bfb6..7a779454c6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -58,6 +58,7 @@ typedef struct STQ STQ; typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; typedef struct SQWorker SQHandle; +typedef struct STrimDbHandle STrimDbHandle; typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapWriter SMetaSnapWriter; @@ -301,6 +302,9 @@ struct STsdbKeepCfg { int32_t keep1; int32_t keep2; }; +struct STrimDbHandle { + volatile int8_t state; // 0 not in trim, 1 in trim +}; struct SVnode { char* path; @@ -325,6 +329,7 @@ struct SVnode { bool restored; tsem_t syncSem; SQHandle* pQuery; + STrimDbHandle trimDbH; }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 10926ae6ad..bd3ebfdcd4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -1045,7 +1045,7 @@ _err: return code; } -int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) { +int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) { int32_t code = 0; int32_t nRef; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 2c68c57176..02d0d12ea2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -16,10 +16,28 @@ #include "tsdb.h" static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { + if (taosArrayGetSize(pTsdb->fs.aDFileSet) == 0) { + return false; + } + + SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, 0); + if (tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now) < 0) { + return true; + } + + if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) { + return false; + } + + STsdbKeepCfg *keepCfg = &pTsdb->keepCfg; + if (keepCfg->keep0 == keepCfg->keep1 && keepCfg->keep1 == keepCfg->keep2) { + return false; + } + for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; + pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, keepCfg, now); + SDiskID did; if (expLevel == pSet->diskId.level) continue; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention2.c b/source/dnode/vnode/src/tsdb/tsdbRetention2.c new file mode 100644 index 0000000000..4bdae09d88 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbRetention2.c @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" + +static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { + if (taosArrayGetSize(pTsdb->fs.aDFileSet) == 0) { + return false; + } + + SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, 0); + if (tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now) < 0) { + return true; + } + + if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) { + return false; + } + + STsdbKeepCfg *keepCfg = &pTsdb->keepCfg; + if (keepCfg->keep0 == keepCfg->keep1 && keepCfg->keep1 == keepCfg->keep2) { + return false; + } + + for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { + pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, keepCfg, now); + SDiskID did; + + if (expLevel == pSet->diskId.level) continue; + + if (expLevel < 0) { + return true; + } else { + if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { + return false; + } + + if (did.level == pSet->diskId.level) continue; + + return true; + } + } + + return false; +} + +/** + * @brief Data migration between multi-tier storage, including remove expired data. + * 1) firstly, remove expired DFileSet; + * 2) partition the tsdbFS by the expLevel and fileSize(e.g. 500G, configurable), and migrate DFileSet groups between multi-tier storage; + * 3) update the tsdbFS and CURRENT in the same transaction; + * 4) finish + * @param pTsdb + * @param now + * @return int32_t + */ +int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { + int32_t code = 0; + + if (!tsdbShouldDoRetention(pTsdb, now)) { + return code; + } + + // do retention + STsdbFS fs; + + code = tsdbFSCopy(pTsdb, &fs); + if (code) goto _err; + + for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; + + if (expLevel < 0) { + taosMemoryFree(pSet->pHeadF); + taosMemoryFree(pSet->pDataF); + taosMemoryFree(pSet->aSttF[0]); + taosMemoryFree(pSet->pSmaF); + taosArrayRemove(fs.aDFileSet, iSet); + iSet--; + } else { + if (expLevel == 0) continue; + if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { + code = terrno; + goto _exit; + } + + if (did.level == pSet->diskId.level) continue; + + // copy file to new disk (todo) + SDFileSet fSet = *pSet; + fSet.diskId = did; + + code = tsdbDFileSetCopy(pTsdb, pSet, &fSet); + if (code) goto _err; + + code = tsdbFSUpsertFSet(&fs, &fSet); + if (code) goto _err; + } + } + + // do change fs + code = tsdbFSCommit1(pTsdb, &fs); + if (code) goto _err; + + taosThreadRwlockWrlock(&pTsdb->rwLock); + + code = tsdbFSCommit2(pTsdb, &fs); + if (code) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _err; + } + + taosThreadRwlockUnlock(&pTsdb->rwLock); + + tsdbFSDestroy(&fs); + +_exit: + return code; + +_err: + tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + ASSERT(0); + // tsdbFSRollback(pTsdb->pFS); + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 4ccfea4051..481c8b66be 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -186,6 +186,17 @@ void vnodePreClose(SVnode *pVnode) { } } +static void vnodeTrimDbClose(SVnode *pVnode) { + int32_t nLoops = 0; + while (atomic_load_8(&pVnode->trimDbH.state) != 0) { + if (++nLoops > 1000) { + vTrace("vgId:%d, wait for trimDb task to finish", TD_VID(pVnode)); + sched_yield(); + nLoops = 0; + } + } +} + void vnodeClose(SVnode *pVnode) { if (pVnode) { vnodeCommit(pVnode); @@ -197,6 +208,7 @@ void vnodeClose(SVnode *pVnode) { smaClose(pVnode->pSma); metaClose(pVnode->pMeta); vnodeCloseBufPool(pVnode); + vnodeTrimDbClose(pVnode); // destroy handle tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&pVnode->syncSem); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6e9eba306a..728b5b4cda 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -378,6 +378,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { pMetaRsp->precision = pVnode->config.tsdbCfg.precision; } +#if 0 static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; SVTrimDbReq trimReq = {0}; @@ -400,6 +401,85 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, _exit: return code; } +#endif + +typedef struct { + SVnode *pVnode; + SVTrimDbReq trimReq; +} SVndTrimDbReq; + +void *vnodeProcessTrimReqFunc(void *param) { + int32_t code = 0; + int8_t oldVal = 0; + SVndTrimDbReq *pReq = (SVndTrimDbReq *)param; + SVnode *pVnode = pReq->pVnode; + + setThreadName("vnode-trim"); + + // process + code = tsdbDoRetention(pVnode->pTsdb, pReq->trimReq.timestamp); + if (code) goto _exit; + + code = smaDoRetention(pVnode->pSma, pReq->trimReq.timestamp); + if (code) goto _exit; + +_exit: + vInfo("vgId:%d, trim vnode thread finished, time:%d", TD_VID(pVnode), pReq->trimReq.timestamp); + oldVal = atomic_val_compare_exchange_8(&pVnode->trimDbH.state, 1, 0); + ASSERT(oldVal == 1); + taosMemoryFree(pReq); + return NULL; +} + +static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + int32_t code = 0; + SVndTrimDbReq *pVndTrimReq = taosMemoryMalloc(sizeof(SVndTrimDbReq)); + STrimDbHandle *pHandle = &pVnode->trimDbH; + + if (!pVndTrimReq) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + pVndTrimReq->pVnode = pVnode; + + if (tDeserializeSVTrimDbReq(pReq, len, &pVndTrimReq->trimReq) != 0) { + taosMemoryFree(pVndTrimReq); + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + if (atomic_val_compare_exchange_8(&pHandle->state, 0, 1) != 0) { + vInfo("vgId:%d, trim vnode request will not be processed since duplicated req, time:%d", TD_VID(pVnode), + pVndTrimReq->trimReq.timestamp); + taosMemoryFree(pVndTrimReq); + goto _exit; + } + + vInfo("vgId:%d, trim vnode request will be processed, time:%d", TD_VID(pVnode), pVndTrimReq->trimReq.timestamp); + + TdThreadAttr thAttr = {0}; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED); + + TdThread tid; + if (taosThreadCreate(&tid, &thAttr, vnodeProcessTrimReqFunc, (void *)pVndTrimReq) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(pVndTrimReq); + taosThreadAttrDestroy(&thAttr); + int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->state, 1, 0); + ASSERT(oldVal == 1); + vError("vgId:%d, failed to create pthread for trim vnode since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } + vDebug("vgId:%d, success to create pthread for trim vnode", TD_VID(pVnode)); + + taosThreadAttrDestroy(&thAttr); + +_exit: + terrno = code; + return code; +} static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); @@ -921,7 +1001,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } if (taosArrayGetSize(newTbUids) > 0) { - vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids)); + vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), + (int32_t)taosArrayGetSize(newTbUids)); } tqUpdateTbUidList(pVnode->pTq, newTbUids, true);