enh: record ver range in writer ctx

This commit is contained in:
Benguang Zhao 2023-08-31 19:13:58 +08:00
parent f207b3ddd5
commit 01d0c1247d
5 changed files with 42 additions and 62 deletions

View File

@ -487,6 +487,8 @@ struct SDataFileWriter {
int32_t tombBlkArrayIdx; int32_t tombBlkArrayIdx;
STombBlock tombBlock[1]; STombBlock tombBlock[1];
int32_t tombBlockIdx; int32_t tombBlockIdx;
// range
SVersionRange range;
} ctx[1]; } ctx[1];
STFile files[TSDB_FTYPE_MAX]; STFile files[TSDB_FTYPE_MAX];
@ -633,6 +635,9 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
.maxVer = VERSION_MIN, .maxVer = VERSION_MIN,
}; };
// range
writer->ctx->range = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
writer->ctx->opened = true; writer->ctx->opened = true;
_exit: _exit:
@ -642,9 +647,9 @@ _exit:
return code; return code;
} }
static int32_t tsdbDataWriterUpdVerRange(SDataFileWriterConfig *config, SVersionRange *range) { int32_t tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer) {
config->minVer = TMIN(config->minVer, range->minVer); range->minVer = TMIN(range->minVer, minVer);
config->maxVer = TMAX(config->maxVer, range->maxVer); range->maxVer = TMAX(range->maxVer, maxVer);
return 0; return 0;
} }
@ -688,8 +693,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
} }
} }
range->minVer = brinBlk->minVer; tsdbWriterUpdVerRange(range, brinBlk->minVer, brinBlk->maxVer);
range->maxVer = brinBlk->maxVer;
// write to file // write to file
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) { for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) {
@ -739,15 +743,12 @@ static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg, code = tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
&writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr, &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr,
&range); &writer->ctx->range);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tsdbDataWriterUpdVerRange(writer->config, &range);
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
@ -812,8 +813,7 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
} }
} }
SVersionRange range = {.minVer = record->minVer, .maxVer = record->maxVer}; tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
tsdbDataWriterUpdVerRange(writer->config, &range);
// to .data file // to .data file
int32_t sizeArr[5] = {0}; int32_t sizeArr[5] = {0};
@ -1200,8 +1200,7 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl
} }
} }
range->minVer = tombBlk->minVer; tsdbWriterUpdVerRange(range, tombBlk->minVer, tombBlk->maxVer);
range->maxVer = tombBlk->maxVer;
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); i++) { for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); i++) {
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&tombBlock->dataArr[i]), TARRAY2_DATA_LEN(&tombBlock->dataArr[i]), code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&tombBlock->dataArr[i]), TARRAY2_DATA_LEN(&tombBlock->dataArr[i]),
@ -1242,11 +1241,10 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg, code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
&writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr, &range); &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr,
&writer->ctx->range);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tsdbDataWriterUpdVerRange(writer->config, &range);
_exit: _exit:
if (code) { if (code) {
@ -1402,6 +1400,12 @@ _exit:
return code; return code;
} }
int32_t tsdbTFileUpdVerRange(STFile *f, SVersionRange range) {
f->minVer = TMIN(f->minVer, range.minVer);
f->maxVer = TMAX(f->maxVer, range.maxVer);
return 0;
}
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) { static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -1446,8 +1450,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.fid = writer->config->fid, .fid = writer->config->fid,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op); code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -1459,8 +1462,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.fid = writer->config->fid, .fid = writer->config->fid,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op); code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else if (writer->config->files[ftype].file.size != writer->files[ftype].size) { } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
@ -1470,8 +1472,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.of = writer->config->files[ftype].file, .of = writer->config->files[ftype].file,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op); code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -1484,8 +1485,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.fid = writer->config->fid, .fid = writer->config->fid,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op); code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else if (writer->config->files[ftype].file.size != writer->files[ftype].size) { } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
@ -1495,8 +1495,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.of = writer->config->files[ftype].file, .of = writer->config->files[ftype].file,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op); code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -1536,8 +1535,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.fid = writer->config->fid, .fid = writer->config->fid,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer);
code = TARRAY2_APPEND(opArr, op); code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -1547,14 +1545,9 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
code = tsdbFsyncFile(writer->fd[i]); code = tsdbFsyncFile(writer->fd[i]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tsdbCloseFile(&writer->fd[i]); tsdbCloseFile(&writer->fd[i]);
writer->files[i].minVer = TMIN(writer->files[i].minVer, writer->config->minVer);
writer->files[i].maxVer = TMAX(writer->files[i].maxVer, writer->config->maxVer);
} }
} }
writer->config->minVer = VERSION_MAX;
writer->config->maxVer = VERSION_MIN;
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);

View File

@ -76,8 +76,6 @@ typedef struct SDataFileWriterConfig {
int32_t maxRow; int32_t maxRow;
int32_t szPage; int32_t szPage;
int32_t fid; int32_t fid;
int64_t minVer;
int64_t maxVer;
int64_t cid; int64_t cid;
SDiskID did; SDiskID did;
int64_t compactVersion; int64_t compactVersion;
@ -110,6 +108,9 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl
int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize); int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize);
int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize); int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize);
// utils
int32_t tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -143,8 +143,6 @@ int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
.maxRow = config->maxRow, .maxRow = config->maxRow,
.szPage = config->szPage, .szPage = config->szPage,
.fid = config->fid, .fid = config->fid,
.minVer = VERSION_MAX,
.maxVer = VERSION_MIN,
.cid = config->cid, .cid = config->cid,
.did = config->did, .did = config->did,
.compactVersion = config->compactVersion, .compactVersion = config->compactVersion,
@ -170,8 +168,6 @@ int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
.compactVersion = config->compactVersion, .compactVersion = config->compactVersion,
.did = config->did, .did = config->did,
.fid = config->fid, .fid = config->fid,
.minVer = VERSION_MAX,
.maxVer = VERSION_MIN,
.cid = config->cid, .cid = config->cid,
.level = config->level, .level = config->level,
.skmTb = writer[0]->skmTb, .skmTb = writer[0]->skmTb,

View File

@ -384,6 +384,8 @@ struct SSttFileWriter {
struct { struct {
bool opened; bool opened;
TABLEID tbid[1]; TABLEID tbid[1];
// range
SVersionRange range;
} ctx[1]; } ctx[1];
// file // file
STsdbFD *fd; STsdbFD *fd;
@ -402,12 +404,6 @@ struct SSttFileWriter {
uint8_t *bufArr[5]; uint8_t *bufArr[5];
}; };
static int32_t tsdbSttWriterUpdVerRange(SSttFileWriterConfig *config, SVersionRange *range) {
config->minVer = TMIN(config->minVer, range->minVer);
config->maxVer = TMAX(config->maxVer, range->maxVer);
return 0;
}
static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize, static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize,
TSttBlkArray *sttBlkArray, uint8_t **bufArr, SVersionRange *range) { TSttBlkArray *sttBlkArray, uint8_t **bufArr, SVersionRange *range) {
if (blockData->nRow == 0) return 0; if (blockData->nRow == 0) return 0;
@ -432,8 +428,7 @@ static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, i
if (sttBlk->maxVer < blockData->aVersion[iRow]) sttBlk->maxVer = blockData->aVersion[iRow]; if (sttBlk->maxVer < blockData->aVersion[iRow]) sttBlk->maxVer = blockData->aVersion[iRow];
} }
range->minVer = sttBlk->minVer; tsdbWriterUpdVerRange(range, sttBlk->minVer, sttBlk->maxVer);
range->maxVer = sttBlk->maxVer;
int32_t sizeArr[5] = {0}; int32_t sizeArr[5] = {0};
code = tCmprBlockData(blockData, cmprAlg, NULL, NULL, bufArr, sizeArr); code = tCmprBlockData(blockData, cmprAlg, NULL, NULL, bufArr, sizeArr);
@ -465,11 +460,9 @@ static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileDoWriteSttBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size, code = tsdbFileDoWriteSttBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size,
writer->sttBlkArray, writer->config->bufArr, &range); writer->sttBlkArray, writer->config->bufArr, &writer->ctx->range);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tsdbSttWriterUpdVerRange(writer->config, &range);
_exit: _exit:
if (code) { if (code) {
@ -535,11 +528,9 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size, code = tsdbFileWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size,
writer->tombBlkArray, writer->config->bufArr, &range); writer->tombBlkArray, writer->config->bufArr, &writer->ctx->range);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tsdbSttWriterUpdVerRange(writer->config, &range);
_exit: _exit:
if (code) { if (code) {
@ -637,8 +628,8 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
.fid = writer->config->fid, .fid = writer->config->fid,
.cid = writer->config->cid, .cid = writer->config->cid,
.size = 0, .size = 0,
.minVer = writer->config->minVer, .minVer = VERSION_MAX,
.maxVer = writer->config->maxVer, .maxVer = VERSION_MIN,
.stt[0] = .stt[0] =
{ {
.level = writer->config->level, .level = writer->config->level,
@ -658,6 +649,9 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
writer->file->size += sizeof(hdr); writer->file->size += sizeof(hdr);
// range
writer->ctx->range = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
writer->ctx->opened = true; writer->ctx->opened = true;
_exit: _exit:
@ -727,10 +721,8 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o
.fid = writer->config->fid, .fid = writer->config->fid,
.nf = writer->file[0], .nf = writer->file[0],
}; };
op.nf.minVer = TMIN(op.nf.minVer, writer->config->minVer); op.nf.minVer = TMIN(op.nf.minVer, writer->ctx->range.minVer);
op.nf.maxVer = TMAX(op.nf.maxVer, writer->config->maxVer); op.nf.maxVer = TMAX(op.nf.maxVer, writer->ctx->range.maxVer);
writer->config->minVer = VERSION_MAX;
writer->config->maxVer = VERSION_MIN;
code = TARRAY2_APPEND(opArray, op); code = TARRAY2_APPEND(opArray, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -82,8 +82,6 @@ struct SSttFileWriterConfig {
int64_t compactVersion; int64_t compactVersion;
SDiskID did; SDiskID did;
int32_t fid; int32_t fid;
int64_t minVer;
int64_t maxVer;
int64_t cid; int64_t cid;
int32_t level; int32_t level;
SSkmInfo *skmTb; SSkmInfo *skmTb;