enh: iterate over a list of snap ranges for STsdbSnapReader
This commit is contained in:
parent
56160a12de
commit
573c28fd06
|
@ -672,6 +672,9 @@ struct SDelFWriter {
|
|||
typedef struct STFileSet STFileSet;
|
||||
typedef TARRAY2(STFileSet *) TFileSetArray;
|
||||
|
||||
typedef struct STSnapRange STSnapRange;
|
||||
typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges
|
||||
|
||||
struct STsdbReadSnap {
|
||||
SMemTable *pMem;
|
||||
SQueryNode *pNode;
|
||||
|
|
|
@ -991,6 +991,40 @@ int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, TSnapRangeArray **fsrArr) {
|
||||
int32_t code = 0;
|
||||
STFileSet *fset;
|
||||
STSnapRange *fsr1;
|
||||
|
||||
fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
|
||||
if (fsrArr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||
code = tsdbTSnapRangeInitRef(fs->tsdb, fset, &fsr1);
|
||||
if (code) break;
|
||||
|
||||
code = TARRAY2_APPEND(fsrArr[0], fsr1);
|
||||
if (code) break;
|
||||
}
|
||||
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
|
||||
|
||||
if (code) {
|
||||
TARRAY2_DESTROY(fsrArr[0], tsdbTSnapRangeClear);
|
||||
fsrArr[0] = NULL;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSDestroyRefRangedSnapshot(TSnapRangeArray **fsrArr) {
|
||||
if (fsrArr[0]) {
|
||||
TARRAY2_DESTROY(fsrArr[0], tsdbTSnapRangeClear);
|
||||
taosMemoryFreeClear(fsrArr[0]);
|
||||
fsrArr[0] = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
const char *gFSBgTaskName[] = {NULL, "MERGE", "RETENTION", "COMPACT"};
|
||||
|
||||
static int32_t tsdbFSRunBgTask(void *arg) {
|
||||
|
|
|
@ -52,7 +52,10 @@ int32_t tsdbCloseFS(STFileSystem **fs);
|
|||
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
||||
int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr);
|
||||
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
||||
int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr);
|
||||
int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsrArr);
|
||||
|
||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, TSnapRangeArray **fsrArr);
|
||||
int32_t tsdbFSDestroyRefRangedSnapshot(TSnapRangeArray **fsrArr);
|
||||
// txn
|
||||
int64_t tsdbFSAllocEid(STFileSystem *fs);
|
||||
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype);
|
||||
|
|
|
@ -458,6 +458,16 @@ int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, STSnapRange **fsr) {
|
||||
fsr[0] = taosMemoryCalloc(1, sizeof(STSnapRange));
|
||||
if (fsr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
fsr[0]->fid = fset1->fid;
|
||||
// fsr[0]->sver = sver;
|
||||
// fsr[0]->ever = ever;
|
||||
return tsdbTFileSetInitRef(pTsdb, fset1, &fsr[0]->fset);
|
||||
}
|
||||
|
||||
int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) {
|
||||
int32_t code = tsdbTFileSetInit(fset1->fid, fset);
|
||||
if (code) return code;
|
||||
|
@ -485,6 +495,15 @@ int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbTSnapRangeClear(STSnapRange **fsr) {
|
||||
if (!fsr[0]) return 0;
|
||||
|
||||
tsdbTFileSetClear(&fsr[0]->fset);
|
||||
taosMemoryFree(fsr[0]);
|
||||
fsr[0] = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbTFileSetClear(STFileSet **fset) {
|
||||
if (!fset[0]) return 0;
|
||||
|
||||
|
@ -545,4 +564,4 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) {
|
|||
if (fset->farr[ftype] != NULL) return false;
|
||||
}
|
||||
return TARRAY2_SIZE(fset->lvlArr) == 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,10 @@ int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
|
|||
int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
|
||||
int32_t tsdbTFileSetClear(STFileSet **fset);
|
||||
int32_t tsdbTFileSetRemove(STFileSet **fset);
|
||||
|
||||
int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, STSnapRange **fsr);
|
||||
int32_t tsdbTSnapRangeClear(STSnapRange **fsr);
|
||||
|
||||
// to/from json
|
||||
int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json);
|
||||
int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset);
|
||||
|
@ -78,8 +82,15 @@ struct STFileSet {
|
|||
TSttLvlArray lvlArr[1]; // level array
|
||||
};
|
||||
|
||||
struct STSnapRange {
|
||||
int32_t fid;
|
||||
int64_t sver;
|
||||
int64_t ever;
|
||||
STFileSet *fset;
|
||||
};
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TSDB_FILE_SET2_H*/
|
||||
#endif /*_TSDB_FILE_SET2_H*/
|
||||
|
|
|
@ -61,6 +61,8 @@ struct STFile {
|
|||
int32_t fid; // file id
|
||||
int64_t cid; // commit id
|
||||
int64_t size;
|
||||
int64_t minVer;
|
||||
int64_t maxVer;
|
||||
union {
|
||||
struct {
|
||||
int32_t level;
|
||||
|
@ -80,4 +82,4 @@ struct STFileObj {
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TSDB_FILE_H*/
|
||||
#endif /*_TSDB_FILE_H*/
|
||||
|
|
|
@ -32,12 +32,12 @@ struct STsdbSnapReader {
|
|||
uint8_t* aBuf[5];
|
||||
SSkmInfo skmTb[1];
|
||||
|
||||
TFileSetArray* fsetArr;
|
||||
TSnapRangeArray* fsrArr;
|
||||
|
||||
// context
|
||||
struct {
|
||||
int32_t fsetArrIdx;
|
||||
STFileSet* fset;
|
||||
int32_t fsrArrIdx;
|
||||
STSnapRange* fsr;
|
||||
bool isDataDone;
|
||||
bool isTombDone;
|
||||
} ctx[1];
|
||||
|
@ -72,10 +72,10 @@ static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
|
|||
};
|
||||
bool hasDataFile = false;
|
||||
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
|
||||
if (reader->ctx->fset->farr[ftype] != NULL) {
|
||||
if (reader->ctx->fsr->fset->farr[ftype] != NULL) {
|
||||
hasDataFile = true;
|
||||
config.files[ftype].exist = true;
|
||||
config.files[ftype].file = reader->ctx->fset->farr[ftype]->f[0];
|
||||
config.files[ftype].file = reader->ctx->fsr->fset->farr[ftype]->f[0];
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -86,7 +86,7 @@ static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
|
|||
|
||||
// stt
|
||||
SSttLvl* lvl;
|
||||
TARRAY2_FOREACH(reader->ctx->fset->lvlArr, lvl) {
|
||||
TARRAY2_FOREACH(reader->ctx->fsr->fset->lvlArr, lvl) {
|
||||
STFileObj* fobj;
|
||||
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
||||
SSttFileReader* sttReader;
|
||||
|
@ -211,14 +211,14 @@ static int32_t tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapReadFileSetBegin(STsdbSnapReader* reader) {
|
||||
static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
ASSERT(reader->ctx->fset == NULL);
|
||||
ASSERT(reader->ctx->fsr == NULL);
|
||||
|
||||
if (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) {
|
||||
reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++);
|
||||
if (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) {
|
||||
reader->ctx->fsr = TARRAY2_GET(reader->fsrArr, reader->ctx->fsrArrIdx++);
|
||||
reader->ctx->isDataDone = false;
|
||||
reader->ctx->isTombDone = false;
|
||||
|
||||
|
@ -236,10 +236,10 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapReadFileSetEnd(STsdbSnapReader* reader) {
|
||||
static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) {
|
||||
tsdbSnapReadFileSetCloseIter(reader);
|
||||
tsdbSnapReadFileSetCloseReader(reader);
|
||||
reader->ctx->fset = NULL;
|
||||
reader->ctx->fsr = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -424,17 +424,14 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type,
|
|||
reader[0]->ever = ever;
|
||||
reader[0]->type = type;
|
||||
|
||||
taosThreadRwlockRdlock(&tsdb->rwLock);
|
||||
code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr);
|
||||
taosThreadRwlockUnlock(&tsdb->rwLock);
|
||||
|
||||
code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, &reader[0]->fsrArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
|
||||
__func__, lino, tstrerror(code), sver, ever, type);
|
||||
tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr);
|
||||
tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
|
||||
taosMemoryFree(reader[0]);
|
||||
reader[0] = NULL;
|
||||
} else {
|
||||
|
@ -462,7 +459,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) {
|
|||
TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose);
|
||||
tsdbDataFileReaderClose(&reader[0]->dataReader);
|
||||
|
||||
tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr);
|
||||
tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
|
||||
tDestroyTSchema(reader[0]->skmTb->pTSchema);
|
||||
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->aBuf); ++i) {
|
||||
|
@ -488,11 +485,11 @@ int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) {
|
|||
data[0] = NULL;
|
||||
|
||||
for (;;) {
|
||||
if (reader->ctx->fset == NULL) {
|
||||
code = tsdbSnapReadFileSetBegin(reader);
|
||||
if (reader->ctx->fsr == NULL) {
|
||||
code = tsdbSnapReadRangeBegin(reader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (reader->ctx->fset == NULL) {
|
||||
if (reader->ctx->fsr == NULL) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -517,7 +514,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) {
|
|||
}
|
||||
}
|
||||
|
||||
code = tsdbSnapReadFileSetEnd(reader);
|
||||
code = tsdbSnapReadRangeEnd(reader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue