diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index bc20240b4c..f4671a0f34 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -89,7 +89,7 @@ typedef struct SLastBlockReader { STimeWindow window; SVersionRange verRange; uint64_t uid; - int32_t rowIndex; + int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL } SLastBlockReader; typedef struct SFilesetIter { @@ -181,7 +181,8 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* re int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader); -static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); +static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); +static int32_t doBuildDataBlock(STsdbReader* pReader); static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -449,7 +450,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level); pReader->suid = pCond->suid; pReader->order = pCond->order; - pReader->capacity = capacity; + pReader->capacity = 1; pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL; pReader->verRange = getQueryVerRange(pVnode, pCond, level); pReader->type = pCond->type; @@ -624,24 +625,29 @@ _end: return code; } -static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex, - SBlockNumber * pBlockNum, SArray* pQualifiedLastBlock) { - int32_t numOfQTable= 0; - size_t numOfTables = taosArrayGetSize(pIndexList); - - int64_t st = taosGetTimestampUs(); - size_t size = 0; - +static void cleanupTableScanInfo(SHashObj* pTableMap) { STableBlockScanInfo* px = NULL; while (1) { - px = taosHashIterate(pReader->status.pTableMap, px); + px = taosHashIterate(pTableMap, px); if (px == NULL) { break; } + // reset the index in last block when handing a new file + px->indexInBlockL = -1; tMapDataClear(&px->mapData); taosArrayClear(px->pBlockList); } +} + +static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex, + SBlockNumber * pBlockNum, SArray* pQualifiedLastBlock) { + int32_t numOfQTable = 0; + size_t sizeInDisk = 0; + size_t numOfTables = taosArrayGetSize(pIndexList); + + int64_t st = taosGetTimestampUs(); + cleanupTableScanInfo(pReader->status.pTableMap); for (int32_t i = 0; i < numOfTables; ++i) { SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i); @@ -651,7 +657,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* tMapDataReset(&pScanInfo->mapData); tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); - size += pScanInfo->mapData.nData; + sizeInDisk += pScanInfo->mapData.nData; for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { SBlock block = {0}; tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock); @@ -707,7 +713,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* double el = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s", - numOfTables, total, numOfQTable, pBlockNum->numOfLastBlocks, size / 1000.0, el, pReader->idStr); + numOfTables, total, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk + / 1000.0, el, pReader->idStr); pReader->cost.numOfBlocks += total; pReader->cost.headFileLoadTime += el; @@ -1396,7 +1403,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf if (minKey == tsLast) { if (!init) { init = true; - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex); + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); tRowMergerInit(&merge, &fRow1, pReader->pSchema); } @@ -1541,7 +1548,7 @@ static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo if (minKey == tsLast) { if (!init) { init = true; - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex); + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); tRowMergerInit(&merge, &fRow1, pReader->pSchema); } doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); @@ -1747,7 +1754,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin, - SVersionRange* pVerRange, int16_t startPos) { + SVersionRange* pVerRange, int16_t* startPos) { pLastBlockReader->uid = uid; pLastBlockReader->window = *pWin; pLastBlockReader->verRange = *pVerRange; @@ -1755,14 +1762,14 @@ static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid } static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { - if (pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) { + if (*(pLastBlockReader->rowIndex) >= pLastBlockReader->lastBlockData.nRow) { return false; } - pLastBlockReader->rowIndex += 1; + *(pLastBlockReader->rowIndex) += 1; SBlockData* pBlockData = &pLastBlockReader->lastBlockData; - for(int32_t i = pLastBlockReader->rowIndex; i < pBlockData->nRow; ++i) { + for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow; ++i) { if (pBlockData->aUid[i] != pLastBlockReader->uid) { continue; } @@ -1777,23 +1784,25 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { // no data any more if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) { - pLastBlockReader->rowIndex = pBlockData->nRow; + *(pLastBlockReader->rowIndex) = pBlockData->nRow; return false; } if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) { - pLastBlockReader->rowIndex = pBlockData->nRow; + *(pLastBlockReader->rowIndex) = pBlockData->nRow; return false; } - pLastBlockReader->rowIndex = i; + *(pLastBlockReader->rowIndex) = i; return true; } - pLastBlockReader->rowIndex = pBlockData->nRow; + // set all data is consumed in last block + *(pLastBlockReader->rowIndex) = pBlockData->nRow; return false; } +#if 0 static int32_t saveCurrentState(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->rowIndex; } @@ -1801,15 +1810,16 @@ static int32_t saveCurrentState(SLastBlockReader* pLastBlockReader) { static void restoreState(SLastBlockReader* pLastBlockReader, int32_t state) { pLastBlockReader->rowIndex = state; } +#endif static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { SBlockData* pBlockData = &pLastBlockReader->lastBlockData; - return pBlockData->aTSKEY[pLastBlockReader->rowIndex]; + return pBlockData->aTSKEY[*pLastBlockReader->rowIndex]; } // todo handle desc order static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { - if (pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) { + if (*pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) { return false; } @@ -1847,7 +1857,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI STSRow* pTSRow = NULL; SRowMerger merge = {0}; - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex); + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); tRowMergerInit(&merge, &fRow1, pReader->pSchema); doMergeRowsInLastBlock(pLastBlockReader, ts, &merge); @@ -1898,7 +1908,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI STSRow* pTSRow = NULL; SRowMerger merge = {0}; - TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex); + TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge); @@ -1925,12 +1935,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { pBlockScanInfo = pReader->status.pTableIter; } - SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - - int16_t startIndex = pBlockInfo != NULL? pBlockScanInfo->indexInBlockL:-1; - initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pReader->window, &pReader->verRange, startIndex); - - bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block + SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pReader->window, &pReader->verRange, &pBlockScanInfo->indexInBlockL); +// bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; @@ -1961,7 +1968,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); // currently loaded file data block is consumed - if (pBlockData->nRow > 0 && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { + if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); setBlockAllDumped(pDumpInfo, pBlock, pReader->order); break; @@ -2252,7 +2259,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, uint64 } static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { - SReaderStatus* pStatus = &pReader->status; + SReaderStatus* pStatus = &pReader->status; SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; while(1) { @@ -2261,17 +2268,42 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { if (pStatus->pTableIter == NULL) { return TSDB_CODE_SUCCESS; } - } else { // let's try next table - pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); - if (pStatus->pTableIter == NULL) { - return TSDB_CODE_SUCCESS; + } + + // load the last data block of current table + // todo opt perf by avoiding load last block repeatly + STableBlockScanInfo* pScanInfo = pStatus->pTableIter; + int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader); + if (code != TSDB_CODE_SUCCESS) { // todo handle error + return code; + } + + initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pReader->window, &pReader->verRange, &pScanInfo->indexInBlockL); + if (pScanInfo->indexInBlockL == -1) { + bool hasData = nextRowInLastBlock(pLastBlockReader); + if (!hasData) { // current table does not have rows in last block, try next table + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); + if (pStatus->pTableIter == NULL) { + return TSDB_CODE_SUCCESS; + } + continue; } } - // find the last block that contain the specified block uid - return doLoadRelatedLastBlock(pLastBlockReader, pStatus->pTableIter->uid, pReader); + code = doBuildDataBlock(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - //todo check for all empty table + if (pReader->pResBlock->info.rows > 0) { + return TSDB_CODE_SUCCESS; + } + + // current table is exhausted, let's try next table + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); + if (pStatus->pTableIter == NULL) { + return TSDB_CODE_SUCCESS; + } } } @@ -2281,8 +2313,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; - SBlock* pBlock = NULL; TSDBKEY key = {0}; + SBlock* pBlock = NULL; STableBlockScanInfo* pScanInfo = NULL; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; @@ -2292,29 +2324,28 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pBlock = getCurrentBlock(pBlockIter); key = getCurrentKeyInBuf(pBlockIter, pReader); + // load the last data block of current table code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader); if (code != TSDB_CODE_SUCCESS) { // todo handle error } - initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pReader->window, &pReader->verRange, pScanInfo->indexInBlockL); - bool hasData = nextRowInLastBlock(pLastBlockReader); - } else { - ASSERT(pBlockIter->numOfBlocks == 0); + initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pReader->window, &pReader->verRange, &pScanInfo->indexInBlockL); } - if (pBlockInfo == NULL || fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) { - if (pBlockInfo != NULL) { - tBlockDataReset(&pStatus->fileBlockData); - code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema); - if (code != TSDB_CODE_SUCCESS) { - //todo - } + if (pBlockInfo == NULL) { // build data block from last data file + ASSERT(pBlockIter->numOfBlocks == 0); + code = buildComposedDataBlock(pReader); + } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) { + tBlockDataReset(&pStatus->fileBlockData); + code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + // todo + } - code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + return code; } // build composed data block @@ -2425,7 +2456,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { return code; } - // all data blocks are check in last file, now let's try the next file + // all data blocks are checked in this last block file, now let's try the next file if (pReader->status.pTableIter == NULL) { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -2434,7 +2465,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { return code; } - // this file does not have blocks, let's start check the last block file + // this file does not have data files, let's start check the last block file if exists if (pBlockIter->numOfBlocks == 0) { goto _begin; } @@ -2855,7 +2886,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, S while(nextRowInLastBlock(pLastBlockReader)) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 == ts) { - TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, pLastBlockReader->rowIndex); + TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex); tRowMerge(pMerger, &fRow1); } else { break;