tsdb/cache: move data file reader for fs iter to cache reader

This commit is contained in:
Minglei Jin 2023-07-20 09:11:26 +08:00
parent 94d8286fad
commit 48222f5dc3
3 changed files with 39 additions and 30 deletions

View File

@ -855,6 +855,8 @@ typedef struct {
SArray *pTombData;
} STableLoadInfo;
struct SDataFileReader;
typedef struct SCacheRowsReader {
STsdb *pTsdb;
STsdbReaderInfo info;
@ -874,6 +876,7 @@ typedef struct SCacheRowsReader {
uint64_t *uidList;
SSHashObj *pTableMap;
SArray *pLDataIterArray;
struct SDataFileReader *pFileReader;
STsdbReadSnap *pReadSnap;
char *idstr;
int64_t lastTs;

View File

@ -1900,7 +1900,6 @@ typedef struct SFSNextRowIter {
int32_t iFileSet;
STFileSet *pFileSet;
TFileSetArray *aDFileSet;
SDataFileReader *pFileReader;
SArray *pIndexList;
int32_t iBrinIndex;
SBrinBlock brinBlock;
@ -1970,7 +1969,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
filesName[3] = pFileObj[3]->fname;
}
code = tsdbDataFileReaderOpen(filesName, &conf, &state->pFileReader);
code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
@ -1983,7 +1982,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->pLastIter = &state->lastIter;
loadDataTomb(state->pr, state->pFileReader);
loadDataTomb(state->pr, state->pr->pFileReader);
if (!state->pIndexList) {
state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
@ -1992,7 +1991,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
}
const TBrinBlkArray *pBlkArray = NULL;
int32_t code = tsdbDataFileReadBrinBlk(state->pFileReader, &pBlkArray);
int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &pBlkArray);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
@ -2034,7 +2033,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
}
code = tsdbDataFileReadBrinBlock(state->pFileReader, pBrinBlk, &state->brinBlock);
code = tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
@ -2079,7 +2078,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
--nCols;
++aCols;
}
code = tsdbDataFileReadBlockDataByColumn(state->pFileReader, pRecord, state->pBlockData, state->pTSchema, aCols,
code = tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData, state->pTSchema, aCols,
nCols);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
@ -2315,8 +2314,9 @@ static void clearLastFileSet(SFSNextRowIter *state) {
state->pBlockData = NULL;
}
if (state->pFileReader) {
tsdbDataFileReaderClose(&state->pFileReader);
if (state->pr->pFileReader) {
tsdbDataFileReaderClose(&state->pr->pFileReader);
state->pr->pFileReader = NULL;
}
if (state->pTSRow) {

View File

@ -17,6 +17,7 @@
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"
#include "tsdbDataFileRW.h"
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
@ -253,6 +254,11 @@ void* tsdbCacherowsReaderClose(void* pReader) {
double elapse = 0;
destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse);
if (p->pFileReader) {
tsdbDataFileReaderClose(&p->pFileReader);
p->pFileReader = NULL;
}
taosMemoryFree((void*)p->idstr);
taosThreadMutexDestroy(&p->readerMutex);