From ab7a20c1171a847da18c4e9bef8b030c83bfa9ae Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 25 Aug 2023 19:04:10 +0800 Subject: [PATCH] enh: write stt and data files with minVer and maxVer --- source/dnode/vnode/src/inc/vnodeInt.h | 6 ++++++ source/dnode/vnode/src/tsdb/tsdbCommit2.c | 8 ++++++++ source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 8 ++++++++ source/dnode/vnode/src/tsdb/tsdbDataFileRW.h | 4 +++- source/dnode/vnode/src/tsdb/tsdbFSetRW.c | 6 +++++- source/dnode/vnode/src/tsdb/tsdbFSetRW.h | 4 +++- source/dnode/vnode/src/tsdb/tsdbMerge.c | 1 + source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 2 ++ source/dnode/vnode/src/tsdb/tsdbSttFileRW.h | 4 +++- source/dnode/vnode/src/vnd/vnodeCommit.c | 3 +++ 10 files changed, 42 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index cc355c5f32..7a242b55cf 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -527,10 +527,16 @@ struct SSnapDataHdr { uint8_t data[]; }; +typedef struct SRange { + int64_t start; + int64_t end; +} SRange; + struct SCommitInfo { SVnodeInfo info; SVnode* pVnode; TXN* txn; + SRange vers; }; struct SCompactInfo { diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 79964c5636..4e096a7f17 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -43,6 +43,8 @@ typedef struct { SDiskID did; TSKEY minKey; TSKEY maxKey; + int64_t minVer; + int64_t maxVer; STFileSet *fset; TABLEID tbid[1]; bool hasTSData; @@ -74,6 +76,8 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { .szPage = committer->szPage, .cmprAlg = committer->cmprAlg, .fid = committer->ctx->fid, + .minVer = committer->ctx->minVer, + .maxVer = committer->ctx->maxVer, .cid = committer->ctx->cid, .did = committer->ctx->did, .level = 0, @@ -87,6 +91,8 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { if (committer->ctx->fset->farr[ftype] != NULL) { config.files[ftype].exist = true; config.files[ftype].file = committer->ctx->fset->farr[ftype]->f[0]; + config.files[ftype].file.minVer = TMIN(config.files[ftype].file.minVer, config.minVer); + config.files[ftype].file.maxVer = TMAX(config.files[ftype].file.maxVer, config.maxVer); } } } @@ -463,6 +469,8 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co committer->compactVersion = INT64_MAX; committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS); committer->ctx->now = taosGetTimestampSec(); + committer->ctx->minVer = info->vers.start; + committer->ctx->maxVer = info->vers.end; committer->ctx->nextKey = tsdb->imem->minKey; if (tsdb->imem->nDel > 0) { diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 6e4cb517ff..3265bb7cc7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -589,6 +589,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, + .minVer = writer->config->minVer, + .maxVer = writer->config->maxVer, }; // .data @@ -602,6 +604,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, + .minVer = writer->config->minVer, + .maxVer = writer->config->maxVer, }; } @@ -616,6 +620,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, + .minVer = writer->config->minVer, + .maxVer = writer->config->maxVer, }; } @@ -627,6 +633,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, + .minVer = writer->config->minVer, + .maxVer = writer->config->maxVer, }; writer->ctx->opened = true; diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h index 827b58fb4a..e87c00d382 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h @@ -76,6 +76,8 @@ typedef struct SDataFileWriterConfig { int32_t maxRow; int32_t szPage; int32_t fid; + int64_t minVer; + int64_t maxVer; int64_t cid; SDiskID did; int64_t compactVersion; @@ -101,4 +103,4 @@ int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord * } #endif -#endif /*_TSDB_DATA_FILE_RW_H*/ \ No newline at end of file +#endif /*_TSDB_DATA_FILE_RW_H*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbFSetRW.c b/source/dnode/vnode/src/tsdb/tsdbFSetRW.c index 83ae8c2429..801ae59838 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSetRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSetRW.c @@ -143,6 +143,8 @@ int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) { .maxRow = config->maxRow, .szPage = config->szPage, .fid = config->fid, + .minVer = config->minVer, + .maxVer = config->maxVer, .cid = config->cid, .did = config->did, .compactVersion = config->compactVersion, @@ -168,6 +170,8 @@ int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) { .compactVersion = config->compactVersion, .did = config->did, .fid = config->fid, + .minVer = config->minVer, + .maxVer = config->maxVer, .cid = config->cid, .level = config->level, .skmTb = writer[0]->skmTb, @@ -292,4 +296,4 @@ _exit: TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbFSetRW.h b/source/dnode/vnode/src/tsdb/tsdbFSetRW.h index b5710407cf..a733bb3c44 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSetRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSetRW.h @@ -34,6 +34,8 @@ typedef struct { int32_t szPage; int8_t cmprAlg; int32_t fid; + int64_t minVer; + int64_t maxVer; int64_t cid; SDiskID did; int32_t level; @@ -52,4 +54,4 @@ int32_t tsdbFSetWriteTombRecord(SFSetWriter *writer, const STombRecord *tombReco } #endif -#endif /*_TSDB_FSET_RW_H*/ \ No newline at end of file +#endif /*_TSDB_FSET_RW_H*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 42a8b5bb3f..e659cedba3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -313,6 +313,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { if (merger->ctx->fset->farr[ftype]) { config.files[ftype].exist = true; config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0]; + } else { config.files[ftype].exist = false; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index fa8d2d5ba4..4f1eb49959 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -694,6 +694,8 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, + .minVer = writer->config->minVer, + .maxVer = writer->config->maxVer, .stt[0] = { .level = writer->config->level, diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h index 242b55795c..d0481d5ec3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h @@ -79,6 +79,8 @@ struct SSttFileWriterConfig { int64_t compactVersion; SDiskID did; int32_t fid; + int64_t minVer; + int64_t maxVer; int64_t cid; int32_t level; SSkmInfo *skmTb; @@ -90,4 +92,4 @@ struct SSttFileWriterConfig { } #endif -#endif /*_TSDB_STT_FILE_RW_H*/ \ No newline at end of file +#endif /*_TSDB_STT_FILE_RW_H*/ diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 136168c5cc..775b298268 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -285,6 +285,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { int32_t code = 0; int32_t lino = 0; char dir[TSDB_FILENAME_LEN] = {0}; + int64_t lastCommitted = pInfo->info.state.committed; tsem_wait(&pVnode->canCommit); @@ -296,6 +297,8 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { pInfo->info.state.committed = pVnode->state.applied; pInfo->info.state.commitTerm = pVnode->state.applyTerm; pInfo->info.state.commitID = ++pVnode->state.commitID; + pInfo->vers.start = lastCommitted + 1; + pInfo->vers.end = pInfo->info.state.committed; pInfo->pVnode = pVnode; pInfo->txn = metaGetTxn(pVnode->pMeta);