From 823aad4a5ec831854acf930017bc647a703bc819 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 31 Aug 2023 14:06:09 +0800 Subject: [PATCH] enh: record version range of STFile for data and stt files --- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 62 ++++++++++++++++---- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 32 ++++++++-- source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 16 +++-- 3 files changed, 85 insertions(+), 25 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 3265bb7cc7..64c9e9d517 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -16,7 +16,7 @@ #include "tsdbDataFileRW.h" extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, - TTombBlkArray *tombBlkArray, uint8_t **bufArr); + TTombBlkArray *tombBlkArray, uint8_t **bufArr, SVersionRange *range); extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize); // SDataFileReader ============================================= @@ -589,8 +589,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, - .minVer = writer->config->minVer, - .maxVer = writer->config->maxVer, + .minVer = VERSION_MAX, + .maxVer = VERSION_MIN, }; // .data @@ -604,8 +604,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, - .minVer = writer->config->minVer, - .maxVer = writer->config->maxVer, + .minVer = VERSION_MAX, + .maxVer = VERSION_MIN, }; } @@ -620,8 +620,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, - .minVer = writer->config->minVer, - .maxVer = writer->config->maxVer, + .minVer = VERSION_MAX, + .maxVer = VERSION_MIN, }; } @@ -633,8 +633,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, - .minVer = writer->config->minVer, - .maxVer = writer->config->maxVer, + .minVer = VERSION_MAX, + .maxVer = VERSION_MIN, }; writer->ctx->opened = true; @@ -646,8 +646,14 @@ _exit: return code; } +static int32_t tsdbSDataUpdVerRange(SDataFileWriterConfig *config, SVersionRange *range) { + config->minVer = TMIN(config->minVer, range->minVer); + config->maxVer = TMAX(config->maxVer, range->maxVer); + return 0; +} + int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, - TBrinBlkArray *brinBlkArray, uint8_t **bufArr) { + TBrinBlkArray *brinBlkArray, uint8_t **bufArr, SVersionRange *range) { if (BRIN_BLOCK_SIZE(brinBlock) == 0) return 0; int32_t code; @@ -686,6 +692,9 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl } } + range->minVer = brinBlk->minVer; + range->maxVer = brinBlk->maxVer; + // write to file for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) { code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr1 + i), TARRAY2_DATA_LEN(brinBlock->dataArr1 + i), @@ -734,11 +743,15 @@ static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; + SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg, - &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr); + &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr, + &range); TSDB_CHECK_CODE(code, lino, _exit); + tsdbSDataUpdVerRange(writer->config, &range); + _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); @@ -803,6 +816,9 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData } } + SVersionRange range = {.minVer = record->minVer, .maxVer = record->maxVer}; + tsdbSDataUpdVerRange(writer->config, &range); + // to .data file int32_t sizeArr[5] = {0}; @@ -1170,10 +1186,12 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - + + SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg, - &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr); + &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr, &range); TSDB_CHECK_CODE(code, lino, _exit); + tsdbSDataUpdVerRange(writer->config, &range); _exit: if (code) { @@ -1358,6 +1376,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr .fid = writer->config->fid, .nf = writer->files[ftype], }; + op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); + op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer); code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); @@ -1369,6 +1389,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr .fid = writer->config->fid, .nf = writer->files[ftype], }; + op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); + op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer); code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) { @@ -1378,6 +1400,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr .of = writer->config->files[ftype].file, .nf = writer->files[ftype], }; + op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); + op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer); code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1390,6 +1414,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr .fid = writer->config->fid, .nf = writer->files[ftype], }; + op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); + op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer); code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) { @@ -1399,6 +1425,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr .of = writer->config->files[ftype].file, .nf = writer->files[ftype], }; + op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); + op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer); code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1438,6 +1466,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr .fid = writer->config->fid, .nf = writer->files[ftype], }; + op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); + op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer); code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1447,9 +1477,14 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr code = tsdbFsyncFile(writer->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); tsdbCloseFile(&writer->fd[i]); + writer->files[i].minVer = TMIN(writer->files[i].minVer, writer->config->minVer); + writer->files[i].maxVer = TMAX(writer->files[i].maxVer, writer->config->maxVer); } } + writer->config->minVer = VERSION_MAX; + writer->config->maxVer = VERSION_MIN; + _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); @@ -1606,6 +1641,7 @@ int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) { ) { code = tsdbDataFileDoWriteBlockData(writer, bData); TSDB_CHECK_CODE(code, lino, _exit); + } else { for (int32_t i = 0; i < bData->nRow; ++i) { TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)}; diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 4f1eb49959..eaa5e40726 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -401,8 +401,14 @@ struct SSttFileWriter { uint8_t *bufArr[5]; }; -int32_t tsdbFileDoWriteBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize, - TSttBlkArray *sttBlkArray, uint8_t **bufArr) { +static int32_t tsdbSSttUpdVerRange(SSttFileWriterConfig *config, SVersionRange *range) { + config->minVer = TMIN(config->minVer, range->minVer); + config->maxVer = TMAX(config->maxVer, range->maxVer); + return 0; +} + +static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize, + TSttBlkArray *sttBlkArray, uint8_t **bufArr, SVersionRange *range) { if (blockData->nRow == 0) return 0; int32_t code = 0; @@ -425,6 +431,9 @@ int32_t tsdbFileDoWriteBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmpr if (sttBlk->maxVer < blockData->aVersion[iRow]) sttBlk->maxVer = blockData->aVersion[iRow]; } + range->minVer = sttBlk->minVer; + range->maxVer = sttBlk->maxVer; + int32_t sizeArr[5] = {0}; code = tCmprBlockData(blockData, cmprAlg, NULL, NULL, bufArr, sizeArr); if (code) return code; @@ -455,9 +464,11 @@ static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbFileDoWriteBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size, - writer->sttBlkArray, writer->config->bufArr); + SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; + code = tsdbFileDoWriteSttBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size, + writer->sttBlkArray, writer->config->bufArr, &range); TSDB_CHECK_CODE(code, lino, _exit); + tsdbSSttUpdVerRange(writer->config, &range); _exit: if (code) { @@ -518,7 +529,7 @@ _exit: } int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, - TTombBlkArray *tombBlkArray, uint8_t **bufArr) { + TTombBlkArray *tombBlkArray, uint8_t **bufArr, SVersionRange *range) { int32_t code; if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0; @@ -554,6 +565,9 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl } } + range->minVer = tombBlk->minVer; + range->maxVer = tombBlk->maxVer; + for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); i++) { code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&tombBlock->dataArr[i]), TARRAY2_DATA_LEN(&tombBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, &bufArr[0], 0, &tombBlk->size[i], &bufArr[1]); @@ -579,9 +593,11 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; + SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size, - writer->tombBlkArray, writer->config->bufArr); + writer->tombBlkArray, writer->config->bufArr, &range); TSDB_CHECK_CODE(code, lino, _exit); + tsdbSSttUpdVerRange(writer->config, &range); _exit: if (code) { @@ -784,6 +800,10 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o .fid = writer->config->fid, .nf = writer->file[0], }; + op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); + op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer); + writer->config->minVer = VERSION_MAX; + writer->config->maxVer = VERSION_MIN; code = TARRAY2_APPEND(opArray, op); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 225822ed97..16f649cd9d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -23,7 +23,7 @@ extern int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SB extern int32_t save_fs(const TFileSetArray *arr, const char *fname); extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype); extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, - TBrinBlkArray *brinBlkArray, uint8_t **bufArr); + TBrinBlkArray *brinBlkArray, uint8_t **bufArr, SVersionRange *range); extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize); extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer); extern int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl); @@ -31,7 +31,7 @@ extern int32_t tsdbSttLvlClear(SSttLvl **lvl); extern int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize); extern int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize); extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, - TTombBlkArray *tombBlkArray, uint8_t **bufArr); + TTombBlkArray *tombBlkArray, uint8_t **bufArr, SVersionRange *range); extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize); extern int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize); @@ -129,16 +129,18 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * TSDB_CHECK_CODE(code, lino, _exit); if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) { + SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, - ctx->brinBlkArray, ctx->bufArr); + ctx->brinBlkArray, ctx->bufArr, &range); TSDB_CHECK_CODE(code, lino, _exit); } } } if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) { + SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, - ctx->brinBlkArray, ctx->bufArr); + ctx->brinBlkArray, ctx->bufArr, &range); TSDB_CHECK_CODE(code, lino, _exit); } @@ -493,8 +495,9 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt); TSDB_CHECK_CODE(code, lino, _exit); } + SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray, - ctx->bufArr); + ctx->bufArr, &range); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -505,8 +508,9 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt); TSDB_CHECK_CODE(code, lino, _exit); } + SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray, - ctx->bufArr); + ctx->bufArr, &range); TSDB_CHECK_CODE(code, lino, _exit); }