From 48222f5dc3ba2752fc027a5afea2b06960bc8652 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 20 Jul 2023 09:11:26 +0800 Subject: [PATCH] tsdb/cache: move data file reader for fs iter to cache reader --- source/dnode/vnode/src/inc/tsdb.h | 47 +++++++++++---------- source/dnode/vnode/src/tsdb/tsdbCache.c | 16 +++---- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 6 +++ 3 files changed, 39 insertions(+), 30 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 79e560df4b..ad72c5924c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -712,7 +712,7 @@ typedef struct SSttBlockLoadInfo { SBlockData blockData[2]; void *pSttStatisBlkArray; SArray *aSttBlk; - int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. + int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t currentLoadBlockIndex; int32_t loadBlocks; double elapsedTime; @@ -855,28 +855,31 @@ typedef struct { SArray *pTombData; } STableLoadInfo; +struct SDataFileReader; + typedef struct SCacheRowsReader { - STsdb *pTsdb; - STsdbReaderInfo info; - TdThreadMutex readerMutex; - SVnode *pVnode; - STSchema *pSchema; - STSchema *pCurrSchema; - uint64_t uid; - char **transferBuf; // todo remove it soon - int32_t numOfCols; - SArray *pCidList; - int32_t *pSlotIds; - int32_t type; - int32_t tableIndex; // currently returned result tables - STableKeyInfo *pTableList; // table id list - int32_t numOfTables; - uint64_t *uidList; - SSHashObj *pTableMap; - SArray *pLDataIterArray; - STsdbReadSnap *pReadSnap; - char *idstr; - int64_t lastTs; + STsdb *pTsdb; + STsdbReaderInfo info; + TdThreadMutex readerMutex; + SVnode *pVnode; + STSchema *pSchema; + STSchema *pCurrSchema; + uint64_t uid; + char **transferBuf; // todo remove it soon + int32_t numOfCols; + SArray *pCidList; + int32_t *pSlotIds; + int32_t type; + int32_t tableIndex; // currently returned result tables + STableKeyInfo *pTableList; // table id list + int32_t numOfTables; + uint64_t *uidList; + SSHashObj *pTableMap; + SArray *pLDataIterArray; + struct SDataFileReader *pFileReader; + STsdbReadSnap *pReadSnap; + char *idstr; + int64_t lastTs; } SCacheRowsReader; typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c3fc100b02..88b2c9781c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1900,7 +1900,6 @@ typedef struct SFSNextRowIter { int32_t iFileSet; STFileSet *pFileSet; TFileSetArray *aDFileSet; - SDataFileReader *pFileReader; SArray *pIndexList; int32_t iBrinIndex; SBrinBlock brinBlock; @@ -1970,7 +1969,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie filesName[3] = pFileObj[3]->fname; } - code = tsdbDataFileReaderOpen(filesName, &conf, &state->pFileReader); + code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -1983,7 +1982,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie state->pLastIter = &state->lastIter; - loadDataTomb(state->pr, state->pFileReader); + loadDataTomb(state->pr, state->pr->pFileReader); if (!state->pIndexList) { state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk)); @@ -1992,7 +1991,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie } const TBrinBlkArray *pBlkArray = NULL; - int32_t code = tsdbDataFileReadBrinBlk(state->pFileReader, &pBlkArray); + int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &pBlkArray); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -2034,7 +2033,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex); } - code = tsdbDataFileReadBrinBlock(state->pFileReader, pBrinBlk, &state->brinBlock); + code = tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -2079,7 +2078,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie --nCols; ++aCols; } - code = tsdbDataFileReadBlockDataByColumn(state->pFileReader, pRecord, state->pBlockData, state->pTSchema, aCols, + code = tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData, state->pTSchema, aCols, nCols); if (code != TSDB_CODE_SUCCESS) { goto _err; @@ -2315,8 +2314,9 @@ static void clearLastFileSet(SFSNextRowIter *state) { state->pBlockData = NULL; } - if (state->pFileReader) { - tsdbDataFileReaderClose(&state->pFileReader); + if (state->pr->pFileReader) { + tsdbDataFileReaderClose(&state->pr->pFileReader); + state->pr->pFileReader = NULL; } if (state->pTSRow) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index a74c38211d..f17041e98b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -17,6 +17,7 @@ #include "tarray.h" #include "tcommon.h" #include "tsdb.h" +#include "tsdbDataFileRW.h" #define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) @@ -253,6 +254,11 @@ void* tsdbCacherowsReaderClose(void* pReader) { double elapse = 0; destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse); + if (p->pFileReader) { + tsdbDataFileReaderClose(&p->pFileReader); + p->pFileReader = NULL; + } + taosMemoryFree((void*)p->idstr); taosThreadMutexDestroy(&p->readerMutex);