diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index db943645e4..42c74c4a23 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -15,7 +15,10 @@ #include "osDef.h" #include "tsdb.h" + #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +#define ALL_ROWS_CHECKED_INDEX (INT16_MIN) +#define DEFAULT_ROW_INDEX_VAL (-1) typedef enum { EXTERNAL_ROWS_PREV = 0x1, @@ -220,7 +223,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK } for (int32_t j = 0; j < numOfTables; ++j) { - STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = -1}; + STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) { info.lastKey = pTsdbReader->window.skey; @@ -699,10 +702,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* } { - // 1. time range check, todo add later -// if (pLastBlock->.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { -// continue; -// } + // 1. time range check + if (pLastBlock->minKey > pReader->window.ekey || pLastBlock->maxKey < pReader->window.skey) { + continue; + } // 2. version range check if (pLastBlock->minVer > pReader->verRange.maxVer || pLastBlock->maxVer < pReader->verRange.minVer) { @@ -727,12 +730,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* return TSDB_CODE_SUCCESS; } -// todo remove pblock parameter -static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) { +static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) { int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; - pDumpInfo->allDumped = true; - pDumpInfo->lastKey = pBlock->maxKey.ts + step; + pDumpInfo->lastKey = maxKey + step; } static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, @@ -832,7 +833,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn pResBlock->info.rows = remain; pDumpInfo->rowIndex += step * remain; - setBlockAllDumped(pDumpInfo, pBlock, pReader->order); + setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; pReader->cost.blockLoadTime += elapsedTime; @@ -1289,7 +1290,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc // todo here we need to each key in the last files to identify if it is really overlapped with last block bool overlapWithlastBlock = false; - if (/*hasDataInLastBlock(pLastBlockReader)*/taosArrayGetSize(pLastBlockReader->pBlockL) > 0) { + if (taosArrayGetSize(pLastBlockReader->pBlockL) > 0) { SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex); overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey); } @@ -1417,7 +1418,8 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf bool init = false; - // file block ---> last block -----> imem -----> mem + // ASC: file block ---> last block -----> imem -----> mem + //DESC: mem -----> imem -----> last block -----> file block if (pReader->order == TSDB_ORDER_ASC) { if (minKey == key) { init = true; @@ -1549,8 +1551,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return TSDB_CODE_SUCCESS; } -// todo handle the desc order check -static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { +static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; @@ -1569,56 +1570,114 @@ static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - int64_t minKey = INT64_MAX; - if (minKey > k.ts) { - minKey = k.ts; + int64_t minKey = 0;//INT64_MAX; + if (ASCENDING_TRAVERSE(pReader->order)) { + minKey = INT64_MAX; // let's find the minimum + if (minKey > k.ts) { + minKey = k.ts; + } + + if (minKey > ik.ts) { + minKey = ik.ts; + } + + if (minKey > key && pBlockData->nRow > 0) { + minKey = key; + } + + if (minKey > tsLast && pLastBlockData->nRow > 0) { + minKey = tsLast; + } + } else { + minKey = INT64_MIN; // let find the maximum ts value + if (minKey < k.ts) { + minKey = k.ts; + } + + if (minKey < ik.ts) { + minKey = ik.ts; + } + + if (minKey < key && pBlockData->nRow > 0) { + minKey = key; + } + + if (minKey < tsLast && pLastBlockData->nRow > 0) { + minKey = tsLast; + } } - if (minKey > ik.ts) { - minKey = ik.ts; - } - - if (minKey > key) { - minKey = key; - } - - if (minKey > tsLast) { - minKey = tsLast; - } - - // file block ---> last block -----> imem -----> mem bool init = false; - if (minKey == key) { - init = true; - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - } - if (minKey == tsLast) { - if (!init) { + // ASC: file block -----> last block -----> imem -----> mem + // DESC: mem -----> imem -----> last block -----> file block + if (ASCENDING_TRAVERSE(pReader->order)) { + if (minKey == key) { init = true; - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - tRowMergerInit(&merge, &fRow1, pReader->pSchema); + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } - doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); - } - if (minKey == ik.ts) { - if (!init) { + if (minKey == tsLast) { + if (!init) { + init = true; + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + tRowMergerInit(&merge, &fRow1, pReader->pSchema); + } + doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); + } + + if (minKey == ik.ts) { + if (!init) { + init = true; + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); + tRowMergerInit(&merge, piRow, pSchema); + } + doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); + } + + if (minKey == k.ts) { + if (!init) { + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + tRowMergerInit(&merge, pRow, pSchema); + } + doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + } + } else { + if (minKey == k.ts) { init = true; - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); - tRowMergerInit(&merge, piRow, pSchema); - } - doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); - } - - if (minKey == k.ts) { - if (!init) { STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); tRowMergerInit(&merge, pRow, pSchema); + doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + } + + if (minKey == ik.ts) { + if (!init) { + init = true; + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); + tRowMergerInit(&merge, piRow, pSchema); + } + doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); + } + + if (minKey == tsLast) { + if (!init) { + init = true; + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + tRowMergerInit(&merge, &fRow1, pReader->pSchema); + } + doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); + } + + if (minKey == key) { + if (!init) { + init = true; + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); + } + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } - doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } tRowMergerGetRow(&merge, &pTSRow); @@ -1816,7 +1875,6 @@ static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid } } -#define ALL_ROWS_CHECKED_INDEX INT16_MIN static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) { *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX; } @@ -1863,16 +1921,6 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { return false; } -#if 0 -static int32_t saveCurrentState(SLastBlockReader* pLastBlockReader) { - return pLastBlockReader->rowIndex; -} - -static void restoreState(SLastBlockReader* pLastBlockReader, int32_t state) { - pLastBlockReader->rowIndex = state; -} -#endif - static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { SBlockData* pBlockData = &pLastBlockReader->lastBlockData; return pBlockData->aTSKEY[*pLastBlockReader->rowIndex]; @@ -1894,7 +1942,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { - return doMergeThreeLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); + return doMergeMultiLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); } else { // imem + file + last block if (pBlockScanInfo->iiter.hasVal) { @@ -2014,7 +2062,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pBlockScanInfo->indexInBlockL); +// initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pBlockScanInfo->indexInBlockL); // bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2026,19 +2074,27 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { while (1) { // todo check the validate of row in file block { - if (pBlockData->nRow > 0 && !isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { + bool hasBlockData = false; + + while (pBlockData->nRow > 0) { + if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { + hasBlockData = true; + break; + } + pDumpInfo->rowIndex += step; SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { - setBlockAllDumped(pDumpInfo, pBlock, pReader->order); + setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); break; } - - continue; } - if (!hasDataInLastBlock(pLastBlockReader)) { + bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); + + // no data in last block and block, no need to proceed. + if ((hasBlockData == false) && (hasBlockLData == false)) { break; } } @@ -2048,7 +2104,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { // currently loaded file data block is consumed if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); - setBlockAllDumped(pDumpInfo, pBlock, pReader->order); + setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); break; } @@ -2218,11 +2274,11 @@ _err: return code; } -static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) { +static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}; - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); +// SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); +// STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); initMemDataIterator(pScanInfo, pReader); TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader); @@ -2321,16 +2377,17 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, uint64 code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { - //todo add log + tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr); return code; } code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData); if (code != TSDB_CODE_SUCCESS) { - // tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64 - // ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s", - // pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlock->nRow, - // pBlock->minVer, pBlock->maxVer, tstrerror(code), pReader->idStr); + tsdbError( + "%p error occurs in loading last block into buffer, last block index:%d, total:%d rows:%d, minVer:%" PRId64 + ", maxVer:%" PRId64 ", code:%s %s", + pReader, pLastBlockReader->currentBlockIndex, (int32_t)taosArrayGetSize(pBlocks), pBlock->nRow, pBlock->minVer, + pBlock->maxVer, tstrerror(code), pReader->idStr); } return TSDB_CODE_SUCCESS; @@ -2357,7 +2414,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); - if (pScanInfo->indexInBlockL == -1 || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { + if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { bool hasData = nextRowInLastBlock(pLastBlockReader); if (!hasData) { // current table does not have rows in last block, try next table pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); @@ -2398,9 +2455,17 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; if (pBlockInfo != NULL) { - pScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + } else { + pScanInfo = pReader->status.pTableIter; + } + + if (pBlockInfo != NULL) { pBlock = getCurrentBlock(pBlockIter); - key = getCurrentKeyInBuf(pBlockIter, pReader); + } + + { + key = getCurrentKeyInBuf(pScanInfo, pReader); // load the last data block of current table code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader); @@ -2408,7 +2473,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return code; } - initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pScanInfo->indexInBlockL); + initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); + if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { + bool hasData = nextRowInLastBlock(pLastBlockReader); + } } if (pBlockInfo == NULL) { // build data block from last data file @@ -2439,7 +2507,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pInfo->uid = pScanInfo->uid; pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts}; setComposedBlockFlag(pReader, false); - setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock, pReader->order); + setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order); } return code; @@ -2663,39 +2731,6 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_ return (SVersionRange){.minVer = startVer, .maxVer = endVer}; } -// // todo not unref yet, since it is not support multi-group interpolation query -// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) { -// // filter the queried time stamp in the first place -// STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle; - -// // starts from the buffer in case of descending timestamp order check data blocks -// size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); - -// int32_t i = 0; -// while (i < numOfTables) { -// STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); - -// // the first qualified table for interpolation query -// // if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) && -// // (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) { -// // break; -// // } - -// i++; -// } - -// // there are no data in all the tables -// if (i == numOfTables) { -// return; -// } - -// STableBlockScanInfo info = *(STableBlockScanInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); -// taosArrayClear(pTsdbReadHandle->pTableCheckInfo); - -// info.lastKey = pTsdbReadHandle->window.skey; -// taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info); -// } - bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) { ASSERT(pKey != NULL); if (pDelList == NULL) { @@ -3227,7 +3262,7 @@ int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { ASSERT(pReader != NULL); taosHashClear(pReader->status.pTableMap); - STableBlockScanInfo info = {.lastKey = 0, .uid = uid}; + STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL}; taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); return TDB_CODE_SUCCESS; }