enh: maintain independent version range of tombs for SDataFileWriter

This commit is contained in:
Benguang Zhao 2023-09-06 11:43:13 +08:00
parent 4540bcb170
commit 0ddbcd50d0
2 changed files with 9 additions and 7 deletions

View File

@ -489,6 +489,7 @@ struct SDataFileWriter {
int32_t tombBlockIdx; int32_t tombBlockIdx;
// range // range
SVersionRange range; SVersionRange range;
SVersionRange tombRange;
} ctx[1]; } ctx[1];
STFile files[TSDB_FTYPE_MAX]; STFile files[TSDB_FTYPE_MAX];
@ -637,6 +638,7 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
// range // range
writer->ctx->range = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; writer->ctx->range = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
writer->ctx->tombRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
writer->ctx->opened = true; writer->ctx->opened = true;
@ -1243,7 +1245,7 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg, code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
&writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr, &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr,
&writer->ctx->range); &writer->ctx->tombRange);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
@ -1543,7 +1545,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
tsdbTFileUpdVerRange(&op.nf, ofRange); tsdbTFileUpdVerRange(&op.nf, ofRange);
tsdbTFileUpdVerRange(&op.nf, writer->ctx->range); tsdbTFileUpdVerRange(&op.nf, writer->ctx->tombRange);
code = TARRAY2_APPEND(opArr, op); code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }

View File

@ -486,9 +486,9 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt); code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; SVersionRange tombRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray, code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray,
ctx->bufArr, &range); ctx->bufArr, &tombRange);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
@ -499,9 +499,9 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt); code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; SVersionRange tombRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray, code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray,
ctx->bufArr, &range); ctx->bufArr, &tombRange);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }