feat: filter pExclude in tsdbFSCreateCopyRangedSnapshot
This commit is contained in:
parent
e1c03118ab
commit
0c41fa56dc
|
@ -301,7 +301,7 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
|
||||||
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
||||||
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
||||||
// STsdbSnapWriter ========================================
|
// STsdbSnapWriter ========================================
|
||||||
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
|
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** ppWriter);
|
||||||
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
|
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
|
||||||
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter);
|
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter);
|
||||||
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
|
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
|
||||||
|
@ -358,7 +358,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
|
||||||
int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader);
|
int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader);
|
||||||
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
|
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
|
||||||
// SRSmaSnapWriter ========================================
|
// SRSmaSnapWriter ========================================
|
||||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter);
|
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void* pRanges, SRSmaSnapWriter** ppWriter);
|
||||||
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||||
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
|
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,7 @@ struct SRSmaSnapWriter {
|
||||||
STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2];
|
STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2];
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) {
|
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void* pRanges, SRSmaSnapWriter** ppWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SVnode* pVnode = pSma->pVnode;
|
SVnode* pVnode = pSma->pVnode;
|
||||||
|
@ -147,7 +147,7 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit
|
||||||
// rsma1/rsma2
|
// rsma1/rsma2
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
if (pSma->pRSmaTsdb[i]) {
|
if (pSma->pRSmaTsdb[i]) {
|
||||||
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]);
|
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, pRanges, &pWriter->pDataWriter[i]);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -983,6 +983,50 @@ int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pExclude, TFileSetArray **fsetArr,
|
||||||
|
TFileOpArray *fopArr) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STFileSet *fset;
|
||||||
|
STFileSet *fset1;
|
||||||
|
|
||||||
|
fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
|
||||||
|
if (fsetArr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
TARRAY2_INIT(fsetArr[0]);
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
|
|
||||||
|
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
||||||
|
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||||
|
int64_t ever = VERSION_MAX;
|
||||||
|
while (pExclude && i < TARRAY2_SIZE(pExclude)) {
|
||||||
|
STSnapRange *u = TARRAY2_GET(pExclude, i);
|
||||||
|
if (fset->fid > u->fid) {
|
||||||
|
i++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (fset->fid == u->fid) {
|
||||||
|
ever = u->sver - 1;
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
|
||||||
|
if (code) break;
|
||||||
|
|
||||||
|
code = TARRAY2_APPEND(fsetArr[0], fset1);
|
||||||
|
if (code) break;
|
||||||
|
}
|
||||||
|
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
|
||||||
|
|
||||||
|
if (code) {
|
||||||
|
TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
|
||||||
|
taosMemoryFree(fsetArr[0]);
|
||||||
|
fsetArr[0] = NULL;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges,
|
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges,
|
||||||
TSnapRangeArray **fsrArr) {
|
TSnapRangeArray **fsrArr) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -1009,12 +1053,12 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
|
||||||
i++;
|
i++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fset->fid == u->fid) {
|
if (fset->fid == u->fid) {
|
||||||
sver1 = u->sver;
|
sver1 = u->sver;
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
|
tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
|
||||||
|
|
||||||
code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
|
code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
|
||||||
|
|
|
@ -52,9 +52,12 @@ int32_t tsdbCloseFS(STFileSystem **fs);
|
||||||
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
||||||
int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr);
|
int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr);
|
||||||
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
||||||
int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsrArr);
|
int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr);
|
||||||
|
|
||||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pEx,
|
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pExclude, TFileSetArray **fsetArr,
|
||||||
|
TFileOpArray *fopArr);
|
||||||
|
int32_t tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr, TFileOpArray *fopArr);
|
||||||
|
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges,
|
||||||
TSnapRangeArray **fsrArr);
|
TSnapRangeArray **fsrArr);
|
||||||
int32_t tsdbFSDestroyRefRangedSnapshot(TSnapRangeArray **fsrArr);
|
int32_t tsdbFSDestroyRefRangedSnapshot(TSnapRangeArray **fsrArr);
|
||||||
// txn
|
// txn
|
||||||
|
|
|
@ -65,6 +65,34 @@ static int32_t tsdbSttLvlInitRef(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lv
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSttLvlFilteredInitEx(STsdb *pTsdb, const SSttLvl *lvl1, int64_t ever, SSttLvl **lvl,
|
||||||
|
TFileOpArray *fopArr) {
|
||||||
|
int32_t code = tsdbSttLvlInit(lvl1->level, lvl);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
const STFileObj *fobj1;
|
||||||
|
TARRAY2_FOREACH(lvl1->fobjArr, fobj1) {
|
||||||
|
if (fobj1->f->maxVer <= ever) {
|
||||||
|
STFileObj *fobj;
|
||||||
|
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj);
|
||||||
|
if (code) {
|
||||||
|
tsdbSttLvlClear(lvl);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
|
||||||
|
} else {
|
||||||
|
STFileOp op = {
|
||||||
|
.optype = TSDB_FOP_REMOVE,
|
||||||
|
.fid = fobj1->f->fid,
|
||||||
|
.of = fobj1->f[0],
|
||||||
|
};
|
||||||
|
TARRAY2_APPEND(fopArr, op);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static void tsdbSttLvlRemoveFObj(void *data) { tsdbTFileObjRemove(*(STFileObj **)data); }
|
static void tsdbSttLvlRemoveFObj(void *data) { tsdbTFileObjRemove(*(STFileObj **)data); }
|
||||||
static void tsdbSttLvlRemove(SSttLvl **lvl) {
|
static void tsdbSttLvlRemove(SSttLvl **lvl) {
|
||||||
TARRAY2_DESTROY(lvl[0]->fobjArr, tsdbSttLvlRemoveFObj);
|
TARRAY2_DESTROY(lvl[0]->fobjArr, tsdbSttLvlRemoveFObj);
|
||||||
|
@ -458,6 +486,46 @@ int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset,
|
||||||
|
TFileOpArray *fopArr) {
|
||||||
|
int32_t code = tsdbTFileSetInit(fset1->fid, fset);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||||
|
if (fset1->farr[ftype] == NULL) continue;
|
||||||
|
STFileObj *fobj = fset1->farr[ftype];
|
||||||
|
if (fobj->f->maxVer <= ever) {
|
||||||
|
code = tsdbTFileObjInit(pTsdb, fobj->f, &fset[0]->farr[ftype]);
|
||||||
|
if (code) {
|
||||||
|
tsdbTFileSetClear(fset);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
STFileOp op = {
|
||||||
|
.optype = TSDB_FOP_REMOVE,
|
||||||
|
.fid = fobj->f->fid,
|
||||||
|
.of = fobj->f[0],
|
||||||
|
};
|
||||||
|
TARRAY2_APPEND(fopArr, op);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const SSttLvl *lvl1;
|
||||||
|
TARRAY2_FOREACH(fset1->lvlArr, lvl1) {
|
||||||
|
SSttLvl *lvl;
|
||||||
|
code = tsdbSttLvlFilteredInitEx(pTsdb, lvl1, ever, &lvl, fopArr);
|
||||||
|
if (code) {
|
||||||
|
tsdbTFileSetClear(fset);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = TARRAY2_APPEND(fset[0]->lvlArr, lvl);
|
||||||
|
if (code) return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, STSnapRange **fsr) {
|
int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, STSnapRange **fsr) {
|
||||||
fsr[0] = taosMemoryCalloc(1, sizeof(*fsr[0]));
|
fsr[0] = taosMemoryCalloc(1, sizeof(*fsr[0]));
|
||||||
if (fsr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (fsr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -46,6 +46,9 @@ int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
|
||||||
int32_t tsdbTFileSetClear(STFileSet **fset);
|
int32_t tsdbTFileSetClear(STFileSet **fset);
|
||||||
int32_t tsdbTFileSetRemove(STFileSet **fset);
|
int32_t tsdbTFileSetRemove(STFileSet **fset);
|
||||||
|
|
||||||
|
int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset,
|
||||||
|
TFileOpArray *fopArr);
|
||||||
|
|
||||||
int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, STSnapRange **fsr);
|
int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, STSnapRange **fsr);
|
||||||
int32_t tsdbTSnapRangeClear(STSnapRange **fsr);
|
int32_t tsdbTSnapRangeClear(STSnapRange **fsr);
|
||||||
|
|
||||||
|
|
|
@ -1028,7 +1028,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** writer) {
|
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** writer) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
@ -1052,7 +1052,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
||||||
writer[0]->compactVersion = INT64_MAX;
|
writer[0]->compactVersion = INT64_MAX;
|
||||||
writer[0]->now = taosGetTimestampMs();
|
writer[0]->now = taosGetTimestampMs();
|
||||||
|
|
||||||
code = tsdbFSCreateCopySnapshot(pTsdb->pFS, &writer[0]->fsetArr);
|
code = tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TSnapRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
|
|
@ -158,6 +158,11 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
|
||||||
if (pReader->pRanges) {
|
if (pReader->pRanges) {
|
||||||
tsdbSnapRangeArrayDestroy(&pReader->pRanges);
|
tsdbSnapRangeArrayDestroy(&pReader->pRanges);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pReader->pRsmaRanges) {
|
||||||
|
tsdbSnapRangeArrayDestroy(&pReader->pRsmaRanges);
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -543,6 +548,10 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pWriter->pRanges) {
|
||||||
|
tsdbSnapRangeArrayDestroy(&pWriter->pRanges);
|
||||||
|
}
|
||||||
|
|
||||||
if (pWriter->pTsdbSnapWriter) {
|
if (pWriter->pTsdbSnapWriter) {
|
||||||
code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
|
code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
@ -577,6 +586,10 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pWriter->pRsmaRanges) {
|
||||||
|
tsdbSnapRangeArrayDestroy(&pWriter->pRsmaRanges);
|
||||||
|
}
|
||||||
|
|
||||||
if (pWriter->pRsmaSnapWriter) {
|
if (pWriter->pRsmaSnapWriter) {
|
||||||
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
|
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
@ -663,7 +676,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
||||||
case SNAP_DATA_DEL: {
|
case SNAP_DATA_DEL: {
|
||||||
// tsdb
|
// tsdb
|
||||||
if (pWriter->pTsdbSnapWriter == NULL) {
|
if (pWriter->pTsdbSnapWriter == NULL) {
|
||||||
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
|
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, pWriter->pRanges,
|
||||||
|
&pWriter->pTsdbSnapWriter);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -723,7 +737,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
||||||
case SNAP_DATA_QTASK: {
|
case SNAP_DATA_QTASK: {
|
||||||
// rsma1/rsma2/qtask for rsma
|
// rsma1/rsma2/qtask for rsma
|
||||||
if (pWriter->pRsmaSnapWriter == NULL) {
|
if (pWriter->pRsmaSnapWriter == NULL) {
|
||||||
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
|
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, pWriter->pRsmaRanges,
|
||||||
|
&pWriter->pRsmaSnapWriter);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue