From a7cbb93a3858c32a9a733ab2ae51375be22fc989 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 3 Jul 2022 23:00:12 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 338 +++++++++++++++---------- 1 file changed, 208 insertions(+), 130 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0a232bc02a..d87545f86a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -148,7 +148,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); -static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader); +static int32_t doMergeRowsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); @@ -422,7 +422,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd #if 1 if (pReader->window.skey > pReader->window.ekey) { - TSWAP(pReader->window.skey, pReader->window.ekey); +// TSWAP(pReader->window.skey, pReader->window.ekey); } #endif @@ -449,15 +449,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd setColumnIdSlotList(pReader, pReader->pResBlock); - STsdbFSState* pFState = pReader->pTsdb->fs->cState; - initFileIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr); - resetDataBlockIterator(&pReader->status.blockIter, pReader->order); - - // no data in files, let's try buffer in memory - if (pReader->status.fileIter.numOfFiles == 0) { - pReader->status.loadFromFile = false; - } - *ppReader = pReader; return code; @@ -722,14 +713,11 @@ _end: // } static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) { - int32_t code = 0; - bool asc = ASCENDING_TRAVERSE(pReader->order); - SArray *aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL); + int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL); if (code != TSDB_CODE_SUCCESS) { - goto _err; + goto _end; } if (taosArrayGetSize(aBlockIdx) == 0) { @@ -741,6 +729,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) { pBlockIdx = (SBlockIdx *)taosArrayGet(aBlockIdx, i); + // uid check if (pBlockIdx->suid != pReader->suid) { continue; } @@ -751,10 +740,16 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, continue; } - if ((asc && (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey)) /*|| - (!asc && (pBlockIdx->minKey > pReader->window.skey || pBlockIdx->maxKey < pReader->window.ekey))*/) { - continue; - } + // todo: not valid info in bockIndex + // time range check +// if (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey) { +// continue; +// } + + // version check +// if (pBlockIdx->minVersion > pReader->verRange.maxVer || pBlockIdx->maxVersion < pReader->verRange.minVer) { +// continue; +// } STableBlockScanInfo* pScanInfo = p; if (pScanInfo->pBlockList == NULL) { @@ -765,10 +760,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, taosArrayPush(pIndexList, pBlockIdx); } - taosArrayDestroy(aBlockIdx); - return TSDB_CODE_SUCCESS; - -_err: +_end: taosArrayDestroy(aBlockIdx); return code; } @@ -1764,51 +1756,52 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte bool asc = ASCENDING_TRAVERSE(pReader->order); pBlockIter->numOfBlocks = numOfBlocks; - // access data blocks according to the offset of each block in asc/desc order. - int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); + // access data blocks according to the offset of each block in asc/desc order. + int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); - SBlockOrderSupporter sup = {0}; - int32_t code = initBlockOrderSupporter(&sup, numOfTables); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + SBlockOrderSupporter sup = {0}; - int32_t cnt = 0; - void* ptr = NULL; - while(1) { - ptr = taosHashIterate(pReader->status.pTableMap, ptr); - if (ptr == NULL) { - break; - } + int32_t code = initBlockOrderSupporter(&sup, numOfTables); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr; - if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) { - continue; - } + int32_t cnt = 0; + void* ptr = NULL; + while (1) { + ptr = taosHashIterate(pReader->status.pTableMap, ptr); + if (ptr == NULL) { + break; + } - size_t num = taosArrayGetSize(pTableScanInfo->pBlockList); - sup.numOfBlocksPerTable[sup.numOfTables] = num; + STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr; + if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) { + continue; + } - char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num); - if (buf == NULL) { - cleanupBlockOrderSupporter(&sup); - return TSDB_CODE_TDB_OUT_OF_MEMORY; - } + size_t num = taosArrayGetSize(pTableScanInfo->pBlockList); + sup.numOfBlocksPerTable[sup.numOfTables] = num; - sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; - for (int32_t k = 0; k < num; ++k) { - SBlockOrderWrapper wrapper = {0}; - wrapper.pBlock = (SBlock*)taosArrayGet(pTableScanInfo->pBlockList, k); - wrapper.uid = pTableScanInfo->uid; + char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num); + if (buf == NULL) { + cleanupBlockOrderSupporter(&sup); + return TSDB_CODE_TDB_OUT_OF_MEMORY; + } - sup.pDataBlockInfo[sup.numOfTables][k] = wrapper; - cnt++; - } + sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; + for (int32_t k = 0; k < num; ++k) { + SBlockOrderWrapper wrapper = {0}; + wrapper.pBlock = (SBlock*)taosArrayGet(pTableScanInfo->pBlockList, k); + wrapper.uid = pTableScanInfo->uid; - sup.numOfTables += 1; - } + sup.pDataBlockInfo[sup.numOfTables][k] = wrapper; + cnt++; + } - ASSERT(numOfBlocks == cnt); + sup.numOfTables += 1; + } + + ASSERT(numOfBlocks == cnt); // since there is only one table qualified, blocks are not sorted if (sup.numOfTables == 1) { @@ -2123,7 +2116,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMerge(&merge, pRow); - doLoadRowsOfIdenticalTsInBuf(pIter, hasVal, k.ts, &merge, pReader); + doMergeRowsInBuf(pIter, hasVal, k.ts, &merge, pReader); tRowMergerGetRow(&merge, &pTSRow); } @@ -2139,7 +2132,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* updateSchema(pRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doLoadRowsOfIdenticalTsInBuf(pIter, hasVal, k.ts, &merge, pReader); + doMergeRowsInBuf(pIter, hasVal, k.ts, &merge, pReader); tRowMerge(&merge, &fRow); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); @@ -2162,6 +2155,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); + ASSERT(pRow != NULL && piRow != NULL); int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; @@ -2180,12 +2174,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (ik.ts == key) { tRowMerge(&merge, piRow); - doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader); + doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader); } if (k.ts == key) { tRowMerge(&merge, pRow); - doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader); + doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader); } tRowMergerGetRow(&merge, &pTSRow); @@ -2225,11 +2219,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* updateSchema(pRow, uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader); + doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader); if (ik.ts == k.ts) { tRowMerge(&merge, piRow); - doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader); + doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader); } if (k.ts == key) { @@ -2281,6 +2275,38 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ASSERT(0); } +static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, STsdbReader* pReader) { + // check for version and time range + int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex]; + if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) { + return false; + } + + int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + if (ts > pReader->window.ekey || ts < pReader->window.skey) { + return false; + } + + return true; +} + +static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { + return (ts > pWindow->ekey) || (ts < pWindow->skey); +} + +static bool isValidTSDBRow(TSDBROW* pRow, STimeWindow* pWindow, SVersionRange* pVerRange) { + TSDBKEY key = TSDBROW_KEY(pRow); + if (outOfTimeWindow(key.ts, pWindow)) { + return false; + } + + if (key.version > pVerRange->maxVer || key.version < pVerRange->minVer) { + return false; + } + + return true; +} + static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; @@ -2309,10 +2335,16 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI // imem & mem are all empty, only file exist TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); +// if (!isValidFileBlockRow(pBlockData, pDumpInfo, pReader)) { +// int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; +// pDumpInfo->rowIndex += step; +// } else { + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); +// } + return TSDB_CODE_SUCCESS; } } @@ -2320,9 +2352,29 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { SSDataBlock* pResBlock = pReader->pResBlock; + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SBlockData* pBlockData = &pReader->status.fileBlockData; + int32_t step = ASCENDING_TRAVERSE(pReader->order)? 1:-1; + while(1) { + // todo check the validate of row in file block + { + if (!isValidFileBlockRow(pBlockData, pDumpInfo, pReader)) { + pDumpInfo->rowIndex += step; + + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); + SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); + + if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { + setBlockAllDumped(pDumpInfo, pBlock, pReader->order); + break; + } + + continue; + } + } + buildComposedDataBlockImpl(pReader, pBlockScanInfo); - SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); @@ -2346,6 +2398,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, pReader->idStr); + return TSDB_CODE_SUCCESS; } @@ -2420,13 +2473,13 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); initMemIterator(pScanInfo, pReader); - if (pScanInfo->memHasVal) { - TSDBROW* pRow = getValidRow(pScanInfo->iter, &pScanInfo->memHasVal, pReader); + TSDBROW* pRow = getValidRow(pScanInfo->iter, &pScanInfo->memHasVal, pReader); + if (pRow != NULL) { key = TSDBROW_KEY(pRow); } - if (pScanInfo->imemHasVal) { - TSDBROW* pRow = getValidRow(pScanInfo->iiter, &pScanInfo->imemHasVal, pReader); + pRow = getValidRow(pScanInfo->iiter, &pScanInfo->imemHasVal, pReader); + if (pRow != NULL) { TSDBKEY k = TSDBROW_KEY(pRow); if (key.ts > k.ts) { key = k; @@ -2490,9 +2543,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // build composed data block code = buildComposedDataBlock(pReader, pScanInfo); - if (code != TSDB_CODE_SUCCESS) { - return code; - } } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { // data in memory that are earlier than current file block // todo rows in buffer should be less than the file block in asc, greater than file block in desc @@ -2542,12 +2592,15 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { } } +// set the correct start position in case of the first/last file block, according to the query time window static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); + SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); - SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SReaderStatus* pStatus = &pReader->status; + + SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; pDumpInfo->totalRows = pBlock->nRow; pDumpInfo->allDumped = false; @@ -2569,56 +2622,59 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl // initialize the block iterator for a new fileset code = initBlockIterator(pReader, pBlockIter, numOfBlocks); + + // set the correct start position according to the query time window initBlockDumpInfo(pReader, pBlockIter); return code; } +static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) { + return (!pDumpInfo->allDumped) && ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc))); +} + static int32_t buildBlockFromFiles(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; + bool asc = ASCENDING_TRAVERSE(pReader->order); + SDataBlockIter* pBlockIter = &pReader->status.blockIter; - if (pReader->status.blockIter.index == -1) { - code = initForFirstBlockInFile(pReader, pBlockIter); - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { - return code; - } - - code = doBuildDataBlock(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } else { + while (1) { SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - bool asc = ASCENDING_TRAVERSE(pReader->order); - SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - // current block are exhausted, try the next file block - if (pDumpInfo->allDumped) { - // try next data block in current file - bool hasNext = blockIteratorNext(&pReader->status.blockIter); - if (hasNext) { // current file is exhausted, let's try the next file - initBlockDumpInfo(pReader, pBlockIter); - } else { - code = initForFirstBlockInFile(pReader, pBlockIter); - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { - return code; + if (fileBlockPartiallyRead(pDumpInfo, asc)) { // file data block is partially loaded + code = buildComposedDataBlock(pReader, pScanInfo); + } else { + // current block are exhausted, try the next file block + if (pDumpInfo->allDumped) { + // try next data block in current file + bool hasNext = blockIteratorNext(&pReader->status.blockIter); + if (hasNext) { // check for the next block in the block accessed order list + initBlockDumpInfo(pReader, pBlockIter); + } else { // data blocks in current file are exhausted, let's try the next file now + code = initForFirstBlockInFile(pReader, pBlockIter); + + // error happens or all the data files are completely checked + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { + return code; + } } } - - return doBuildDataBlock(pReader); - } else if ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && !asc)) { - // file data block is partially loaded - // todo refactor: extract method - return buildComposedDataBlock(pReader, pScanInfo); - } else { // current block is not loaded yet - return doBuildDataBlock(pReader); + + // current block is not loaded yet, or data in buffer may overlap with the file block. + code = doBuildDataBlock(pReader); + } + + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pReader->pResBlock->info.rows > 0) { + return TSDB_CODE_SUCCESS; } } - - return code; } // // todo not unref yet, since it is not support multi-group interpolation query @@ -2654,24 +2710,19 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info); // } -static bool outofTimeWindow(int64_t ts, STimeWindow* pWindow, int32_t order) { - return (((ts > pWindow->ekey) && ASCENDING_TRAVERSE(order)) || ((ts < pWindow->skey) && ASCENDING_TRAVERSE(order))); -} - TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { if (!(*hasVal)) { return NULL; } TSDBROW* pRow = tsdbTbDataIterGet(pIter); - TSDBKEY key = TSDBROW_KEY(pRow); - if (outofTimeWindow(key.ts, &pReader->window, pReader->order)) { + if (outOfTimeWindow(key.ts, &pReader->window)) { *hasVal = false; return NULL; } - if (key.version <= pReader->verRange.maxVer) { + if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) { return pRow; } @@ -2684,25 +2735,31 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { pRow = tsdbTbDataIterGet(pIter); key = TSDBROW_KEY(pRow); - if (outofTimeWindow(key.ts, &pReader->window, pReader->order)) { + if (outOfTimeWindow(key.ts, &pReader->window)) { *hasVal = false; return NULL; } - if (key.version <= pReader->verRange.maxVer) { + if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) { return pRow; } } } -int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) { +int32_t doMergeRowsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) { while (1) { *hasVal = tsdbTbDataIterNext(pIter); if (!(*hasVal)) { break; } + // data exists but not valid TSDBROW* pRow = getValidRow(pIter, hasVal, pReader); + if (pRow == NULL) { + break; + } + + // ts is not identical, quit TSDBKEY k = TSDBROW_KEY(pRow); if (k.ts != ts) { break; @@ -2828,7 +2885,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* has updateSchema(pRow, uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doLoadRowsOfIdenticalTsInBuf(dIter, hasVal, k.ts, &merge, pReader); + doMergeRowsInBuf(dIter, hasVal, k.ts, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); } @@ -2842,18 +2899,18 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo updateSchema(piRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, piRow, pReader->pSchema); - doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); tRowMerge(&merge, pRow); - doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); } else { updateSchema(pRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->memHasVal, ik.ts, &merge, pReader); + doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->memHasVal, ik.ts, &merge, pReader); tRowMerge(&merge, piRow); - doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->imemHasVal, k.ts, &merge, pReader); + doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->imemHasVal, k.ts, &merge, pReader); } tRowMergerGetRow(&merge, pTSRow); @@ -3191,11 +3248,32 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } + SDataBlockIter* pBlockIter = &pReader->status.blockIter; + + STsdbFSState* pFState = pReader->pTsdb->fs->cState; + initFileIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr); + resetDataBlockIterator(&pReader->status.blockIter, pReader->order); + + // no data in files, let's try buffer in memory + if (pReader->status.fileIter.numOfFiles == 0) { + pReader->status.loadFromFile = false; + } else { + code = initForFirstBlockInFile(pReader, pBlockIter); + if ((code != TSDB_CODE_SUCCESS)/* || (pReader->status.loadFromFile == false)*/) { + return code; + } + +// code = doBuildDataBlock(pReader); +// if (code != TSDB_CODE_SUCCESS) { +// return code; +// } + } + tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; _err: - tsdbError("failed to create tsdb reader, code: %s %s", tstrerror(code), pReader->idStr); + tsdbError("failed to create data reader, code: %s %s", tstrerror(code), pReader->idStr); return code; }