tsdb/cache: reuse pDataFileReader for same fileset
This commit is contained in:
parent
968cfd9e65
commit
424dd40078
|
@ -877,6 +877,7 @@ typedef struct SCacheRowsReader {
|
||||||
SSHashObj *pTableMap;
|
SSHashObj *pTableMap;
|
||||||
SArray *pLDataIterArray;
|
SArray *pLDataIterArray;
|
||||||
struct SDataFileReader *pFileReader;
|
struct SDataFileReader *pFileReader;
|
||||||
|
STFileSet *pCurFileSet;
|
||||||
STsdbReadSnap *pReadSnap;
|
STsdbReadSnap *pReadSnap;
|
||||||
char *idstr;
|
char *idstr;
|
||||||
int64_t lastTs;
|
int64_t lastTs;
|
||||||
|
|
|
@ -1949,35 +1949,39 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
|
|
||||||
STFileObj **pFileObj = state->pFileSet->farr;
|
STFileObj **pFileObj = state->pFileSet->farr;
|
||||||
if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
|
if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
|
||||||
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.szPage};
|
if (state->pFileSet != state->pr->pCurFileSet) {
|
||||||
const char *filesName[4] = {0};
|
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.szPage};
|
||||||
if (pFileObj[0] != NULL) {
|
const char *filesName[4] = {0};
|
||||||
conf.files[0].file = *pFileObj[0]->f;
|
if (pFileObj[0] != NULL) {
|
||||||
conf.files[0].exist = true;
|
conf.files[0].file = *pFileObj[0]->f;
|
||||||
filesName[0] = pFileObj[0]->fname;
|
conf.files[0].exist = true;
|
||||||
|
filesName[0] = pFileObj[0]->fname;
|
||||||
|
|
||||||
conf.files[1].file = *pFileObj[1]->f;
|
conf.files[1].file = *pFileObj[1]->f;
|
||||||
conf.files[1].exist = true;
|
conf.files[1].exist = true;
|
||||||
filesName[1] = pFileObj[1]->fname;
|
filesName[1] = pFileObj[1]->fname;
|
||||||
|
|
||||||
conf.files[2].file = *pFileObj[2]->f;
|
conf.files[2].file = *pFileObj[2]->f;
|
||||||
conf.files[2].exist = true;
|
conf.files[2].exist = true;
|
||||||
filesName[2] = pFileObj[2]->fname;
|
filesName[2] = pFileObj[2]->fname;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pFileObj[3] != NULL) {
|
||||||
|
conf.files[3].exist = true;
|
||||||
|
conf.files[3].file = *pFileObj[3]->f;
|
||||||
|
filesName[3] = pFileObj[3]->fname;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
loadDataTomb(state->pr, state->pr->pFileReader);
|
||||||
|
|
||||||
|
state->pr->pCurFileSet = state->pFileSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pFileObj[3] != NULL) {
|
|
||||||
conf.files[3].exist = true;
|
|
||||||
conf.files[3].file = *pFileObj[3]->f;
|
|
||||||
filesName[3] = pFileObj[3]->fname;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
loadDataTomb(state->pr, state->pr->pFileReader);
|
|
||||||
|
|
||||||
if (!state->pIndexList) {
|
if (!state->pIndexList) {
|
||||||
state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
|
state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
|
||||||
} else {
|
} else {
|
||||||
|
@ -2218,19 +2222,6 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t clearNextRowFromFS(void *iter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
SFSNextRowIter *state = (SFSNextRowIter *)iter;
|
|
||||||
if (!state) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
clearLastFileSet(state);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef enum SMEMNEXTROWSTATES {
|
typedef enum SMEMNEXTROWSTATES {
|
||||||
SMEMNEXTROW_ENTER,
|
SMEMNEXTROW_ENTER,
|
||||||
SMEMNEXTROW_NEXT,
|
SMEMNEXTROW_NEXT,
|
||||||
|
@ -2350,6 +2341,36 @@ typedef struct CacheNextRowIter {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
} CacheNextRowIter;
|
} CacheNextRowIter;
|
||||||
|
|
||||||
|
int32_t clearNextRowFromFS(void *iter) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SFSNextRowIter *state = (SFSNextRowIter *)iter;
|
||||||
|
if (!state) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state->pLastIter) {
|
||||||
|
lastIterClose(&state->pLastIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state->pBlockData) {
|
||||||
|
tBlockDataDestroy(state->pBlockData);
|
||||||
|
state->pBlockData = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state->pTSRow) {
|
||||||
|
taosMemoryFree(state->pTSRow);
|
||||||
|
state->pTSRow = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state->pRowIter->pSkyline) {
|
||||||
|
taosArrayDestroy(state->pRowIter->pSkyline);
|
||||||
|
state->pRowIter->pSkyline = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static void clearLastFileSet(SFSNextRowIter *state) {
|
static void clearLastFileSet(SFSNextRowIter *state) {
|
||||||
if (state->pLastIter) {
|
if (state->pLastIter) {
|
||||||
lastIterClose(&state->pLastIter);
|
lastIterClose(&state->pLastIter);
|
||||||
|
@ -2363,6 +2384,8 @@ static void clearLastFileSet(SFSNextRowIter *state) {
|
||||||
if (state->pr->pFileReader) {
|
if (state->pr->pFileReader) {
|
||||||
tsdbDataFileReaderClose(&state->pr->pFileReader);
|
tsdbDataFileReaderClose(&state->pr->pFileReader);
|
||||||
state->pr->pFileReader = NULL;
|
state->pr->pFileReader = NULL;
|
||||||
|
|
||||||
|
state->pr->pCurFileSet = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state->pTSRow) {
|
if (state->pTSRow) {
|
||||||
|
|
Loading…
Reference in New Issue