From be1af580c2a1f2c86fc0bf72873e04fa8459e065 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 10 Jul 2022 12:25:11 +0000 Subject: [PATCH] tsdb retention function --- source/dnode/vnode/src/inc/tsdb.h | 68 +++++------ source/dnode/vnode/src/tsdb/tsdbFS.c | 8 ++ .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 111 ++++++++++++++++++ source/dnode/vnode/src/tsdb/tsdbRetention.c | 32 ++++- source/dnode/vnode/src/tsdb/tsdbUtil.c | 32 +++-- 5 files changed, 206 insertions(+), 45 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index cce3da60cb..6c874f2797 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -32,39 +32,38 @@ extern "C" { #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -typedef struct TSDBROW TSDBROW; -typedef struct TABLEID TABLEID; -typedef struct TSDBKEY TSDBKEY; -typedef struct SDelData SDelData; -typedef struct SDelIdx SDelIdx; -typedef struct STbData STbData; -typedef struct SMemTable SMemTable; -typedef struct STbDataIter STbDataIter; -typedef struct STable STable; -typedef struct SMapData SMapData; -typedef struct SBlockIdx SBlockIdx; -typedef struct SBlock SBlock; -typedef struct SBlockStatis SBlockStatis; -typedef struct SAggrBlkCol SAggrBlkCol; -typedef struct SColData SColData; -typedef struct SBlockDataHdr SBlockDataHdr; -typedef struct SBlockData SBlockData; -typedef struct SDelFile SDelFile; -typedef struct STsdbCacheFile STsdbCacheFile; -typedef struct SHeadFile SHeadFile; -typedef struct SDataFile SDataFile; -typedef struct SLastFile SLastFile; -typedef struct SSmaFile SSmaFile; -typedef struct SDFileSet SDFileSet; -typedef struct SDataFWriter SDataFWriter; -typedef struct SDataFReader SDataFReader; -typedef struct SDelFWriter SDelFWriter; -typedef struct SDelFReader SDelFReader; -typedef struct SRowIter SRowIter; -typedef struct STsdbFS STsdbFS; -typedef struct SRowMerger SRowMerger; -typedef struct STsdbFSState STsdbFSState; -typedef struct STsdbSnapHdr STsdbSnapHdr; +typedef struct TSDBROW TSDBROW; +typedef struct TABLEID TABLEID; +typedef struct TSDBKEY TSDBKEY; +typedef struct SDelData SDelData; +typedef struct SDelIdx SDelIdx; +typedef struct STbData STbData; +typedef struct SMemTable SMemTable; +typedef struct STbDataIter STbDataIter; +typedef struct STable STable; +typedef struct SMapData SMapData; +typedef struct SBlockIdx SBlockIdx; +typedef struct SBlock SBlock; +typedef struct SBlockStatis SBlockStatis; +typedef struct SAggrBlkCol SAggrBlkCol; +typedef struct SColData SColData; +typedef struct SBlockDataHdr SBlockDataHdr; +typedef struct SBlockData SBlockData; +typedef struct SDelFile SDelFile; +typedef struct SHeadFile SHeadFile; +typedef struct SDataFile SDataFile; +typedef struct SLastFile SLastFile; +typedef struct SSmaFile SSmaFile; +typedef struct SDFileSet SDFileSet; +typedef struct SDataFWriter SDataFWriter; +typedef struct SDataFReader SDataFReader; +typedef struct SDelFWriter SDelFWriter; +typedef struct SDelFReader SDelFReader; +typedef struct SRowIter SRowIter; +typedef struct STsdbFS STsdbFS; +typedef struct SRowMerger SRowMerger; +typedef struct STsdbFSState STsdbFSState; +typedef struct STsdbSnapHdr STsdbSnapHdr; #define TSDB_MAX_SUBBLOCKS 8 #define TSDB_FHDR_SIZE 512 @@ -163,6 +162,7 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData); // other int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision); void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey); +int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now); int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline); void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg); // tsdbMemTable ============================================================================================== @@ -200,6 +200,7 @@ int32_t tsdbFSRollback(STsdbFS *pFS); int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile); int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet); +void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid); SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState); SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid); // tsdbReaderWriter.c ============================================================================================== @@ -213,6 +214,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg); SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter); +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); // SDataFReader int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 4a33dab08c..53b6735c30 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -688,6 +688,14 @@ _exit: return code; } +void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid) { + int32_t idx; + + idx = taosArraySearchIdx(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); + ASSERT(idx >= 0); + taosArrayRemove(pState->aDFileSet, idx); +} + SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; } SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index e96ee03b03..4e7a9d3b04 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1913,3 +1913,114 @@ _err: taosArrayDestroy(aBlockCol); return code; } + +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { + int32_t code = 0; + int64_t n; + int64_t size; + TdFilePtr pOutFD = NULL; // TODO + TdFilePtr PInFD = NULL; // TODO + char fNameFrom[TSDB_FILENAME_LEN]; + char fNameTo[TSDB_FILENAME_LEN]; + + // head + tsdbDataFileName(pTsdb, pSetFrom, TSDB_HEAD_FILE, fNameFrom); + tsdbDataFileName(pTsdb, pSetTo, TSDB_HEAD_FILE, fNameTo); + + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); + if (PInFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fHead.size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + taosCloseFile(&pOutFD); + taosCloseFile(&PInFD); + + // data + tsdbDataFileName(pTsdb, pSetFrom, TSDB_DATA_FILE, fNameFrom); + tsdbDataFileName(pTsdb, pSetTo, TSDB_DATA_FILE, fNameTo); + + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); + if (PInFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fData.size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + taosCloseFile(&pOutFD); + taosCloseFile(&PInFD); + + // last + tsdbDataFileName(pTsdb, pSetFrom, TSDB_LAST_FILE, fNameFrom); + tsdbDataFileName(pTsdb, pSetTo, TSDB_LAST_FILE, fNameTo); + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); + if (PInFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fLast.size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + taosCloseFile(&pOutFD); + taosCloseFile(&PInFD); + + // sma + tsdbDataFileName(pTsdb, pSetFrom, TSDB_SMA_FILE, fNameFrom); + tsdbDataFileName(pTsdb, pSetTo, TSDB_SMA_FILE, fNameTo); + + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); + if (PInFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fSma.size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + taosCloseFile(&pOutFD); + taosCloseFile(&PInFD); + + return code; + +_err: + tsdbError("vgId:%d tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index e73f3f947c..1b6839459f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -25,8 +25,37 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { // do retention for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs->nState->aDFileSet); iSet++) { SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pTsdb->fs->nState->aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pDFileSet->fid, &pTsdb->keepCfg, now); + SDiskID did; - // TODO + // check + if (expLevel == pDFileSet->fid) continue; + + if (expLevel < 0) { + tsdbFSStateDeleteDFileSet(pTsdb->fs->nState, pDFileSet->fid); + iSet--; + // tsdbInfo("vgId:%d file is out of data, remove it", td); + } else { + // alloc + if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { + code = terrno; + goto _err; + } + + if (did.level == pDFileSet->diskId.level) continue; + + ASSERT(did.level > pDFileSet->diskId.level); + + // copy the file to new disk + SDFileSet nDFileSet = *pDFileSet; + nDFileSet.diskId = did; + + code = tsdbDFileSetCopy(pTsdb, pDFileSet, &nDFileSet); + if (code) goto _err; + + code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, &nDFileSet); + if (code) goto _err; + } } // commit @@ -38,5 +67,6 @@ _exit: _err: tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbFSRollback(pTsdb->fs); return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 415a674737..385a6b9d89 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -465,17 +465,27 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK *maxKey = *minKey + minutes * tsTickPerMin[precision] - 1; } -// int tsdFidLevel(int fid, TSKEY now, minute) { -// if (fid >= pRtn->maxFid) { -// return 0; -// } else if (fid >= pRtn->midFid) { -// return 1; -// } else if (fid >= pRtn->minFid) { -// return 2; -// } else { -// return -1; -// } -// } +int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) { + int32_t aFid[3]; + TSKEY key; + + key = now - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision]; + aFid[0] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->keep0); + key = now - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision]; + aFid[1] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->keep1); + key = now - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision]; + aFid[2] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->keep2); + + if (fid >= aFid[0]) { + return 0; + } else if (fid >= aFid[1]) { + return 1; + } else if (fid >= aFid[2]) { + return 2; + } else { + return -1; + } +} // TSDBROW ====================================================== void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {