enh: write stt and data files with minVer and maxVer
This commit is contained in:
parent
da28d490aa
commit
ab7a20c117
|
@ -527,10 +527,16 @@ struct SSnapDataHdr {
|
|||
uint8_t data[];
|
||||
};
|
||||
|
||||
typedef struct SRange {
|
||||
int64_t start;
|
||||
int64_t end;
|
||||
} SRange;
|
||||
|
||||
struct SCommitInfo {
|
||||
SVnodeInfo info;
|
||||
SVnode* pVnode;
|
||||
TXN* txn;
|
||||
SRange vers;
|
||||
};
|
||||
|
||||
struct SCompactInfo {
|
||||
|
|
|
@ -43,6 +43,8 @@ typedef struct {
|
|||
SDiskID did;
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
int64_t minVer;
|
||||
int64_t maxVer;
|
||||
STFileSet *fset;
|
||||
TABLEID tbid[1];
|
||||
bool hasTSData;
|
||||
|
@ -74,6 +76,8 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
|
|||
.szPage = committer->szPage,
|
||||
.cmprAlg = committer->cmprAlg,
|
||||
.fid = committer->ctx->fid,
|
||||
.minVer = committer->ctx->minVer,
|
||||
.maxVer = committer->ctx->maxVer,
|
||||
.cid = committer->ctx->cid,
|
||||
.did = committer->ctx->did,
|
||||
.level = 0,
|
||||
|
@ -87,6 +91,8 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
|
|||
if (committer->ctx->fset->farr[ftype] != NULL) {
|
||||
config.files[ftype].exist = true;
|
||||
config.files[ftype].file = committer->ctx->fset->farr[ftype]->f[0];
|
||||
config.files[ftype].file.minVer = TMIN(config.files[ftype].file.minVer, config.minVer);
|
||||
config.files[ftype].file.maxVer = TMAX(config.files[ftype].file.maxVer, config.maxVer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -463,6 +469,8 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
|
|||
committer->compactVersion = INT64_MAX;
|
||||
committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS);
|
||||
committer->ctx->now = taosGetTimestampSec();
|
||||
committer->ctx->minVer = info->vers.start;
|
||||
committer->ctx->maxVer = info->vers.end;
|
||||
|
||||
committer->ctx->nextKey = tsdb->imem->minKey;
|
||||
if (tsdb->imem->nDel > 0) {
|
||||
|
|
|
@ -589,6 +589,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
|
|||
.fid = writer->config->fid,
|
||||
.cid = writer->config->cid,
|
||||
.size = 0,
|
||||
.minVer = writer->config->minVer,
|
||||
.maxVer = writer->config->maxVer,
|
||||
};
|
||||
|
||||
// .data
|
||||
|
@ -602,6 +604,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
|
|||
.fid = writer->config->fid,
|
||||
.cid = writer->config->cid,
|
||||
.size = 0,
|
||||
.minVer = writer->config->minVer,
|
||||
.maxVer = writer->config->maxVer,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -616,6 +620,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
|
|||
.fid = writer->config->fid,
|
||||
.cid = writer->config->cid,
|
||||
.size = 0,
|
||||
.minVer = writer->config->minVer,
|
||||
.maxVer = writer->config->maxVer,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -627,6 +633,8 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
|
|||
.fid = writer->config->fid,
|
||||
.cid = writer->config->cid,
|
||||
.size = 0,
|
||||
.minVer = writer->config->minVer,
|
||||
.maxVer = writer->config->maxVer,
|
||||
};
|
||||
|
||||
writer->ctx->opened = true;
|
||||
|
|
|
@ -76,6 +76,8 @@ typedef struct SDataFileWriterConfig {
|
|||
int32_t maxRow;
|
||||
int32_t szPage;
|
||||
int32_t fid;
|
||||
int64_t minVer;
|
||||
int64_t maxVer;
|
||||
int64_t cid;
|
||||
SDiskID did;
|
||||
int64_t compactVersion;
|
||||
|
@ -101,4 +103,4 @@ int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TSDB_DATA_FILE_RW_H*/
|
||||
#endif /*_TSDB_DATA_FILE_RW_H*/
|
||||
|
|
|
@ -143,6 +143,8 @@ int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
|
|||
.maxRow = config->maxRow,
|
||||
.szPage = config->szPage,
|
||||
.fid = config->fid,
|
||||
.minVer = config->minVer,
|
||||
.maxVer = config->maxVer,
|
||||
.cid = config->cid,
|
||||
.did = config->did,
|
||||
.compactVersion = config->compactVersion,
|
||||
|
@ -168,6 +170,8 @@ int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
|
|||
.compactVersion = config->compactVersion,
|
||||
.did = config->did,
|
||||
.fid = config->fid,
|
||||
.minVer = config->minVer,
|
||||
.maxVer = config->maxVer,
|
||||
.cid = config->cid,
|
||||
.level = config->level,
|
||||
.skmTb = writer[0]->skmTb,
|
||||
|
@ -292,4 +296,4 @@ _exit:
|
|||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,8 @@ typedef struct {
|
|||
int32_t szPage;
|
||||
int8_t cmprAlg;
|
||||
int32_t fid;
|
||||
int64_t minVer;
|
||||
int64_t maxVer;
|
||||
int64_t cid;
|
||||
SDiskID did;
|
||||
int32_t level;
|
||||
|
@ -52,4 +54,4 @@ int32_t tsdbFSetWriteTombRecord(SFSetWriter *writer, const STombRecord *tombReco
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TSDB_FSET_RW_H*/
|
||||
#endif /*_TSDB_FSET_RW_H*/
|
||||
|
|
|
@ -313,6 +313,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
|
|||
if (merger->ctx->fset->farr[ftype]) {
|
||||
config.files[ftype].exist = true;
|
||||
config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0];
|
||||
|
||||
} else {
|
||||
config.files[ftype].exist = false;
|
||||
}
|
||||
|
|
|
@ -694,6 +694,8 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
|
|||
.fid = writer->config->fid,
|
||||
.cid = writer->config->cid,
|
||||
.size = 0,
|
||||
.minVer = writer->config->minVer,
|
||||
.maxVer = writer->config->maxVer,
|
||||
.stt[0] =
|
||||
{
|
||||
.level = writer->config->level,
|
||||
|
|
|
@ -79,6 +79,8 @@ struct SSttFileWriterConfig {
|
|||
int64_t compactVersion;
|
||||
SDiskID did;
|
||||
int32_t fid;
|
||||
int64_t minVer;
|
||||
int64_t maxVer;
|
||||
int64_t cid;
|
||||
int32_t level;
|
||||
SSkmInfo *skmTb;
|
||||
|
@ -90,4 +92,4 @@ struct SSttFileWriterConfig {
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TSDB_STT_FILE_RW_H*/
|
||||
#endif /*_TSDB_STT_FILE_RW_H*/
|
||||
|
|
|
@ -285,6 +285,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
char dir[TSDB_FILENAME_LEN] = {0};
|
||||
int64_t lastCommitted = pInfo->info.state.committed;
|
||||
|
||||
tsem_wait(&pVnode->canCommit);
|
||||
|
||||
|
@ -296,6 +297,8 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
|||
pInfo->info.state.committed = pVnode->state.applied;
|
||||
pInfo->info.state.commitTerm = pVnode->state.applyTerm;
|
||||
pInfo->info.state.commitID = ++pVnode->state.commitID;
|
||||
pInfo->vers.start = lastCommitted + 1;
|
||||
pInfo->vers.end = pInfo->info.state.committed;
|
||||
pInfo->pVnode = pVnode;
|
||||
pInfo->txn = metaGetTxn(pVnode->pMeta);
|
||||
|
||||
|
|
Loading…
Reference in New Issue