From 0c41fa56dc113ed5294eb2d27b1fc4a332631710 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 18 Sep 2023 14:46:26 +0800 Subject: [PATCH] feat: filter pExclude in tsdbFSCreateCopyRangedSnapshot --- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/sma/smaSnapshot.c | 4 +- source/dnode/vnode/src/tsdb/tsdbFS2.c | 46 ++++++++++++++- source/dnode/vnode/src/tsdb/tsdbFS2.h | 7 ++- source/dnode/vnode/src/tsdb/tsdbFSet2.c | 68 ++++++++++++++++++++++ source/dnode/vnode/src/tsdb/tsdbFSet2.h | 3 + source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 4 +- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 19 +++++- 8 files changed, 144 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 27a393abf4..d5e1585af4 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -301,7 +301,7 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader); int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); // STsdbSnapWriter ======================================== -int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter); +int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr); int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); @@ -358,7 +358,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader); int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData); // SRSmaSnapWriter ======================================== -int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter); +int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void* pRanges, SRSmaSnapWriter** ppWriter); int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback); diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 2a010f5a84..ca46b4728f 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -128,7 +128,7 @@ struct SRSmaSnapWriter { STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2]; }; -int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) { +int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void* pRanges, SRSmaSnapWriter** ppWriter) { int32_t code = 0; int32_t lino = 0; SVnode* pVnode = pSma->pVnode; @@ -147,7 +147,7 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit // rsma1/rsma2 for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { - code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]); + code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, pRanges, &pWriter->pDataWriter[i]); TSDB_CHECK_CODE(code, lino, _exit); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index d71473b079..759fded522 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -983,6 +983,50 @@ int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) { return 0; } +int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pExclude, TFileSetArray **fsetArr, + TFileOpArray *fopArr) { + int32_t code = 0; + STFileSet *fset; + STFileSet *fset1; + + fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray)); + if (fsetArr == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + TARRAY2_INIT(fsetArr[0]); + + int32_t i = 0; + + taosThreadRwlockRdlock(&fs->tsdb->rwLock); + TARRAY2_FOREACH(fs->fSetArr, fset) { + int64_t ever = VERSION_MAX; + while (pExclude && i < TARRAY2_SIZE(pExclude)) { + STSnapRange *u = TARRAY2_GET(pExclude, i); + if (fset->fid > u->fid) { + i++; + continue; + } + if (fset->fid == u->fid) { + ever = u->sver - 1; + i++; + } + } + + code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr); + if (code) break; + + code = TARRAY2_APPEND(fsetArr[0], fset1); + if (code) break; + } + taosThreadRwlockUnlock(&fs->tsdb->rwLock); + + if (code) { + TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear); + taosMemoryFree(fsetArr[0]); + fsetArr[0] = NULL; + } + return code; +} + int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges, TSnapRangeArray **fsrArr) { int32_t code = -1; @@ -1009,12 +1053,12 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev i++; continue; } - if (fset->fid == u->fid) { sver1 = u->sver; i++; } } + tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1); code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index f8b87b8a84..73b27a8fbd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -52,9 +52,12 @@ int32_t tsdbCloseFS(STFileSystem **fs); int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr); int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr); int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr); -int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsrArr); +int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr); -int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pEx, +int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pExclude, TFileSetArray **fsetArr, + TFileOpArray *fopArr); +int32_t tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr, TFileOpArray *fopArr); +int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges, TSnapRangeArray **fsrArr); int32_t tsdbFSDestroyRefRangedSnapshot(TSnapRangeArray **fsrArr); // txn diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index e0434b7da6..dd86d01598 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -65,6 +65,34 @@ static int32_t tsdbSttLvlInitRef(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lv return 0; } +static int32_t tsdbSttLvlFilteredInitEx(STsdb *pTsdb, const SSttLvl *lvl1, int64_t ever, SSttLvl **lvl, + TFileOpArray *fopArr) { + int32_t code = tsdbSttLvlInit(lvl1->level, lvl); + if (code) return code; + + const STFileObj *fobj1; + TARRAY2_FOREACH(lvl1->fobjArr, fobj1) { + if (fobj1->f->maxVer <= ever) { + STFileObj *fobj; + code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj); + if (code) { + tsdbSttLvlClear(lvl); + return code; + } + + TARRAY2_APPEND(lvl[0]->fobjArr, fobj); + } else { + STFileOp op = { + .optype = TSDB_FOP_REMOVE, + .fid = fobj1->f->fid, + .of = fobj1->f[0], + }; + TARRAY2_APPEND(fopArr, op); + } + } + return 0; +} + static void tsdbSttLvlRemoveFObj(void *data) { tsdbTFileObjRemove(*(STFileObj **)data); } static void tsdbSttLvlRemove(SSttLvl **lvl) { TARRAY2_DESTROY(lvl[0]->fobjArr, tsdbSttLvlRemoveFObj); @@ -458,6 +486,46 @@ int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs return 0; } +int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset, + TFileOpArray *fopArr) { + int32_t code = tsdbTFileSetInit(fset1->fid, fset); + if (code) return code; + + for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { + if (fset1->farr[ftype] == NULL) continue; + STFileObj *fobj = fset1->farr[ftype]; + if (fobj->f->maxVer <= ever) { + code = tsdbTFileObjInit(pTsdb, fobj->f, &fset[0]->farr[ftype]); + if (code) { + tsdbTFileSetClear(fset); + return code; + } + } else { + STFileOp op = { + .optype = TSDB_FOP_REMOVE, + .fid = fobj->f->fid, + .of = fobj->f[0], + }; + TARRAY2_APPEND(fopArr, op); + } + } + + const SSttLvl *lvl1; + TARRAY2_FOREACH(fset1->lvlArr, lvl1) { + SSttLvl *lvl; + code = tsdbSttLvlFilteredInitEx(pTsdb, lvl1, ever, &lvl, fopArr); + if (code) { + tsdbTFileSetClear(fset); + return code; + } + + code = TARRAY2_APPEND(fset[0]->lvlArr, lvl); + if (code) return code; + } + + return 0; +} + int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, STSnapRange **fsr) { fsr[0] = taosMemoryCalloc(1, sizeof(*fsr[0])); if (fsr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index e6d78a8cfe..c78cc179df 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -46,6 +46,9 @@ int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs int32_t tsdbTFileSetClear(STFileSet **fset); int32_t tsdbTFileSetRemove(STFileSet **fset); +int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset, + TFileOpArray *fopArr); + int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, STSnapRange **fsr); int32_t tsdbTSnapRangeClear(STSnapRange **fsr); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 9710eee6c4..3b74475870 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1028,7 +1028,7 @@ _exit: return code; } -int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** writer) { +int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** writer) { int32_t code = 0; int32_t lino = 0; @@ -1052,7 +1052,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr writer[0]->compactVersion = INT64_MAX; writer[0]->now = taosGetTimestampMs(); - code = tsdbFSCreateCopySnapshot(pTsdb->pFS, &writer[0]->fsetArr); + code = tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TSnapRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index cf04471be0..b2fbdc07e9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -158,6 +158,11 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { if (pReader->pRanges) { tsdbSnapRangeArrayDestroy(&pReader->pRanges); } + + if (pReader->pRsmaRanges) { + tsdbSnapRangeArrayDestroy(&pReader->pRsmaRanges); + } + taosMemoryFree(pReader); } @@ -543,6 +548,10 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (code) goto _exit; } + if (pWriter->pRanges) { + tsdbSnapRangeArrayDestroy(&pWriter->pRanges); + } + if (pWriter->pTsdbSnapWriter) { code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback); if (code) goto _exit; @@ -577,6 +586,10 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (code) goto _exit; } + if (pWriter->pRsmaRanges) { + tsdbSnapRangeArrayDestroy(&pWriter->pRsmaRanges); + } + if (pWriter->pRsmaSnapWriter) { code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback); if (code) goto _exit; @@ -663,7 +676,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { case SNAP_DATA_DEL: { // tsdb if (pWriter->pTsdbSnapWriter == NULL) { - code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); + code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, pWriter->pRanges, + &pWriter->pTsdbSnapWriter); if (code) goto _err; } @@ -723,7 +737,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { case SNAP_DATA_QTASK: { // rsma1/rsma2/qtask for rsma if (pWriter->pRsmaSnapWriter == NULL) { - code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter); + code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, pWriter->pRsmaRanges, + &pWriter->pRsmaSnapWriter); if (code) goto _err; }