From b8bc052aa48bd7a955cb9103bfecb14ea5e4ccf6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Jun 2022 09:40:31 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 63 +++++++++++++++++++++----- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 455807a347..33c96d2125 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -129,6 +129,8 @@ typedef struct SReaderStatus { SHashObj* pTableMap; // SHash STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. SFileBlockDumpInfo fBlockDumpInfo; + + SDFileSet* pCurrentFileSet; // current opened file set SBlockData fileBlockData; SFileSetIter fileIter; SDataBlockIter blockIter; @@ -352,6 +354,8 @@ static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState static void resetDataBlockIterator(SDataBlockIter* pIter) { pIter->numOfBlocks = -1; + pIter->index = -1; + pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); } static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) { @@ -363,13 +367,14 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* // check file the time range of coverage STimeWindow win = {0}; - SDFileSet *pDFile = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index); + pReader->status.pCurrentFileSet = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index); - int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pDFile); + int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileSet); if (code != TSDB_CODE_SUCCESS) { goto _err; } + // todo file range check // tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); // current file are not overlapped with query time window, ignore remain files @@ -380,6 +385,8 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* return false; } + return true; + _err: return false; } @@ -884,10 +891,11 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock)); } + pScanInfo->blockIdx = blockIndex; taosArrayPush(pIndexList, &blockIndex); } - tMapDataClear(&blockIdxMap); +// tMapDataClear(&blockIdxMap); return TSDB_CODE_SUCCESS; _err: @@ -895,7 +903,7 @@ _err: return code; } -static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables) { +static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables, int32_t* numOfBlocks) { size_t numOfTables = taosArrayGetSize(pIndexList); *numOfValidTables = 0; @@ -917,6 +925,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ return code; } + // 1. time range check if ((ASCENDING_TRAVERSE(pReader->order) && (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey)) || (!ASCENDING_TRAVERSE(pReader->order) && @@ -924,10 +933,17 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ continue; } + // 2. version range check +// if (block.minVersion > pReader->startVersion || block.maxVersion < pReader->endVersion) { +// continue; +// } + void* p = taosArrayPush(pScanInfo->pBlockList, &block); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + + (*numOfBlocks) += 1; } if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) { @@ -940,7 +956,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { int64_t st = taosGetTimestampUs(); - int32_t numOfCols = taosArrayGetSize(pReader->pResBlock->pDataBlock); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); @@ -1983,7 +1998,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte int32_t cnt = 0; void* ptr = NULL; while(1) { - ptr = taosHashIterate(pReader->status.pTableMap, &ptr); + ptr = taosHashIterate(pReader->status.pTableMap, ptr); if (ptr == NULL) { break; } @@ -2025,6 +2040,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte } tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s", pReader, cnt, pReader->idStr); + + pBlockIter->index = 0; return TSDB_CODE_SUCCESS; } @@ -2428,18 +2445,29 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* return TSDB_CODE_SUCCESS; } + int32_t code = TSDB_CODE_SUCCESS; TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion}; STbData* d = NULL; if (pReader->pTsdb->mem != NULL) { tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); - tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter); + if (d != NULL) { + code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } } STbData* di = NULL; if (pReader->pTsdb->imem != NULL) { tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); - tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter); + if (di != NULL) { + code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } } pBlockScanInfo->iterInit = true; @@ -2494,7 +2522,7 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { if (taosArrayGetSize(pIndexList) > 0) { uint32_t numOfValidTable = 0; - code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable); + code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, &numOfBlocks); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2539,6 +2567,7 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { pInfo->window.skey = pBlock->minKey.ts; pInfo->window.ekey = pBlock->maxKey.ts; setComposedBlockFlag(pReader, false); + *exists = true; } } else { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -3167,7 +3196,15 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pStatus->loadFromFile) { bool exists = true; int32_t code = loadDataInFiles(pReader, &exists); + if (code != TSDB_CODE_SUCCESS) { + return false; + } + if (exists) { + return true; + } else { // let's try the in-memory buffer + + } } else { // no data in files, let's try the buffer while(1) { if (pStatus->pTableIter == NULL) { @@ -3323,10 +3360,12 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - SBlockData data = {0}; - doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &data); + int32_t code = tBlockDataInit(&pReader->status.fileBlockData); + doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &pReader->status.fileBlockData); +// TSDBROW row = tsdbRowFromBlockData(&pReader->status.fileBlockData, 0); +// doAppendOneRow(pReader->pResBlock, pReader, row.); - // todo convert blockData to ssdatablock + return pReader->pResBlock->pDataBlock; } // /** // * In the following two cases, the data has been loaded to SColumnInfoData.