enh: record version range of STFile for data and stt files

This commit is contained in:
Benguang Zhao 2023-08-31 14:06:09 +08:00
parent 3cd458f5c9
commit 823aad4a5e
3 changed files with 85 additions and 25 deletions

View File

@ -16,7 +16,7 @@
#include "tsdbDataFileRW.h"
extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr);
TTombBlkArray *tombBlkArray, uint8_t **bufArr, SVersionRange *range);
extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize);
// SDataFileReader =============================================
@ -589,8 +589,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
.fid = writer->config->fid,
.cid = writer->config->cid,
.size = 0,
.minVer = writer->config->minVer,
.maxVer = writer->config->maxVer,
.minVer = VERSION_MAX,
.maxVer = VERSION_MIN,
};
// .data
@ -604,8 +604,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
.fid = writer->config->fid,
.cid = writer->config->cid,
.size = 0,
.minVer = writer->config->minVer,
.maxVer = writer->config->maxVer,
.minVer = VERSION_MAX,
.maxVer = VERSION_MIN,
};
}
@ -620,8 +620,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
.fid = writer->config->fid,
.cid = writer->config->cid,
.size = 0,
.minVer = writer->config->minVer,
.maxVer = writer->config->maxVer,
.minVer = VERSION_MAX,
.maxVer = VERSION_MIN,
};
}
@ -633,8 +633,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
.fid = writer->config->fid,
.cid = writer->config->cid,
.size = 0,
.minVer = writer->config->minVer,
.maxVer = writer->config->maxVer,
.minVer = VERSION_MAX,
.maxVer = VERSION_MIN,
};
writer->ctx->opened = true;
@ -646,8 +646,14 @@ _exit:
return code;
}
static int32_t tsdbSDataUpdVerRange(SDataFileWriterConfig *config, SVersionRange *range) {
config->minVer = TMIN(config->minVer, range->minVer);
config->maxVer = TMAX(config->maxVer, range->maxVer);
return 0;
}
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize,
TBrinBlkArray *brinBlkArray, uint8_t **bufArr) {
TBrinBlkArray *brinBlkArray, uint8_t **bufArr, SVersionRange *range) {
if (BRIN_BLOCK_SIZE(brinBlock) == 0) return 0;
int32_t code;
@ -686,6 +692,9 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
}
}
range->minVer = brinBlk->minVer;
range->maxVer = brinBlk->maxVer;
// write to file
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) {
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr1 + i), TARRAY2_DATA_LEN(brinBlock->dataArr1 + i),
@ -734,11 +743,15 @@ static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
&writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr);
&writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr,
&range);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbSDataUpdVerRange(writer->config, &range);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
@ -803,6 +816,9 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
}
}
SVersionRange range = {.minVer = record->minVer, .maxVer = record->maxVer};
tsdbSDataUpdVerRange(writer->config, &range);
// to .data file
int32_t sizeArr[5] = {0};
@ -1170,10 +1186,12 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
&writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr);
&writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr, &range);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbSDataUpdVerRange(writer->config, &range);
_exit:
if (code) {
@ -1358,6 +1376,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.fid = writer->config->fid,
.nf = writer->files[ftype],
};
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
@ -1369,6 +1389,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.fid = writer->config->fid,
.nf = writer->files[ftype],
};
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
@ -1378,6 +1400,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.of = writer->config->files[ftype].file,
.nf = writer->files[ftype],
};
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -1390,6 +1414,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.fid = writer->config->fid,
.nf = writer->files[ftype],
};
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
@ -1399,6 +1425,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.of = writer->config->files[ftype].file,
.nf = writer->files[ftype],
};
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -1438,6 +1466,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.fid = writer->config->fid,
.nf = writer->files[ftype],
};
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -1447,9 +1477,14 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
code = tsdbFsyncFile(writer->fd[i]);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbCloseFile(&writer->fd[i]);
writer->files[i].minVer = TMIN(writer->files[i].minVer, writer->config->minVer);
writer->files[i].maxVer = TMAX(writer->files[i].maxVer, writer->config->maxVer);
}
}
writer->config->minVer = VERSION_MAX;
writer->config->maxVer = VERSION_MIN;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
@ -1606,6 +1641,7 @@ int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
) {
code = tsdbDataFileDoWriteBlockData(writer, bData);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
for (int32_t i = 0; i < bData->nRow; ++i) {
TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)};

View File

@ -401,8 +401,14 @@ struct SSttFileWriter {
uint8_t *bufArr[5];
};
int32_t tsdbFileDoWriteBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize,
TSttBlkArray *sttBlkArray, uint8_t **bufArr) {
static int32_t tsdbSSttUpdVerRange(SSttFileWriterConfig *config, SVersionRange *range) {
config->minVer = TMIN(config->minVer, range->minVer);
config->maxVer = TMAX(config->maxVer, range->maxVer);
return 0;
}
static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize,
TSttBlkArray *sttBlkArray, uint8_t **bufArr, SVersionRange *range) {
if (blockData->nRow == 0) return 0;
int32_t code = 0;
@ -425,6 +431,9 @@ int32_t tsdbFileDoWriteBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmpr
if (sttBlk->maxVer < blockData->aVersion[iRow]) sttBlk->maxVer = blockData->aVersion[iRow];
}
range->minVer = sttBlk->minVer;
range->maxVer = sttBlk->maxVer;
int32_t sizeArr[5] = {0};
code = tCmprBlockData(blockData, cmprAlg, NULL, NULL, bufArr, sizeArr);
if (code) return code;
@ -455,9 +464,11 @@ static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbFileDoWriteBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size,
writer->sttBlkArray, writer->config->bufArr);
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileDoWriteSttBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size,
writer->sttBlkArray, writer->config->bufArr, &range);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbSSttUpdVerRange(writer->config, &range);
_exit:
if (code) {
@ -518,7 +529,7 @@ _exit:
}
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr) {
TTombBlkArray *tombBlkArray, uint8_t **bufArr, SVersionRange *range) {
int32_t code;
if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0;
@ -554,6 +565,9 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl
}
}
range->minVer = tombBlk->minVer;
range->maxVer = tombBlk->maxVer;
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); i++) {
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&tombBlock->dataArr[i]), TARRAY2_DATA_LEN(&tombBlock->dataArr[i]),
TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, &bufArr[0], 0, &tombBlk->size[i], &bufArr[1]);
@ -579,9 +593,11 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size,
writer->tombBlkArray, writer->config->bufArr);
writer->tombBlkArray, writer->config->bufArr, &range);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbSSttUpdVerRange(writer->config, &range);
_exit:
if (code) {
@ -784,6 +800,10 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o
.fid = writer->config->fid,
.nf = writer->file[0],
};
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
writer->config->minVer = VERSION_MAX;
writer->config->maxVer = VERSION_MIN;
code = TARRAY2_APPEND(opArray, op);
TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -23,7 +23,7 @@ extern int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SB
extern int32_t save_fs(const TFileSetArray *arr, const char *fname);
extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize,
TBrinBlkArray *brinBlkArray, uint8_t **bufArr);
TBrinBlkArray *brinBlkArray, uint8_t **bufArr, SVersionRange *range);
extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize);
extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer);
extern int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
@ -31,7 +31,7 @@ extern int32_t tsdbSttLvlClear(SSttLvl **lvl);
extern int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize);
extern int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize);
extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr);
TTombBlkArray *tombBlkArray, uint8_t **bufArr, SVersionRange *range);
extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize);
extern int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize);
@ -129,16 +129,18 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
TSDB_CHECK_CODE(code, lino, _exit);
if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) {
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
ctx->brinBlkArray, ctx->bufArr);
ctx->brinBlkArray, ctx->bufArr, &range);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) {
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
ctx->brinBlkArray, ctx->bufArr);
ctx->brinBlkArray, ctx->bufArr, &range);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -493,8 +495,9 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt);
TSDB_CHECK_CODE(code, lino, _exit);
}
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray,
ctx->bufArr);
ctx->bufArr, &range);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -505,8 +508,9 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt);
TSDB_CHECK_CODE(code, lino, _exit);
}
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray,
ctx->bufArr);
ctx->bufArr, &range);
TSDB_CHECK_CODE(code, lino, _exit);
}