From 7762e0ea4711a44d075dd744de6f5b28f492f5eb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Apr 2023 16:10:06 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 126 ++++++++++++++----------- 1 file changed, 70 insertions(+), 56 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a46f7a7924..66a6df0e7c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -180,6 +180,12 @@ typedef struct STsdbReaderAttr { SVersionRange verRange; } STsdbReaderAttr; +typedef struct SResultBlockInfo { + SSDataBlock* pResBlock; + bool freeBlock; + int64_t capacity; +} SResultBlockInfo; + struct STsdbReader { STsdb* pTsdb; SVersionRange verRange; @@ -187,12 +193,10 @@ struct STsdbReader { bool suspended; uint64_t suid; int16_t order; - bool freeBlock; EReadMode readMode; uint64_t rowsNum; STimeWindow window; // the primary query time window that applies to all queries - SSDataBlock* pResBlock; - int32_t capacity; + SResultBlockInfo resBlockInfo; SReaderStatus status; char* idStr; // query info handle, for debug purpose int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows @@ -205,7 +209,7 @@ struct STsdbReader { SDelFReader* pDelFReader; // the del file reader SArray* pDelIdx; // del file block index; SBlockInfoBuf blockInfoBuf; - int32_t step; + EContentData step; STsdbReader* innerReader[2]; }; @@ -727,6 +731,21 @@ void tsdbReleaseDataBlock(STsdbReader* pReader) { } } +static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, SQueryTableDataCond* pCond) { + pResBlockInfo->capacity = capacity; + pResBlockInfo->pResBlock = pResBlock; + terrno = 0; + + if (pResBlockInfo->pResBlock == NULL) { + pResBlockInfo->freeBlock = true; + pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity); + } else { + pResBlockInfo->freeBlock = false; + } + + return terrno; +} + static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, SSDataBlock* pResBlock, const char* idstr) { int32_t code = 0; @@ -746,21 +765,16 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level); pReader->suid = pCond->suid; pReader->order = pCond->order; - pReader->capacity = capacity; - pReader->pResBlock = pResBlock; + pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL; pReader->verRange = getQueryVerRange(pVnode, pCond, level); pReader->type = pCond->type; pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket - if (pReader->pResBlock == NULL) { - pReader->freeBlock = true; - pReader->pResBlock = createResBlock(pCond, pReader->capacity); - if (pReader->pResBlock == NULL) { - code = terrno; - goto _end; - } + code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond); + if (code != TSDB_CODE_SUCCESS) { + goto _end; } if (pCond->numOfCols <= 0) { @@ -792,7 +806,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd goto _end; } - pReader->status.pPrimaryTsCol = taosArrayGet(pReader->pResBlock->pDataBlock, pSup->slotId[0]); + pReader->status.pPrimaryTsCol = taosArrayGet(pReader->resBlockInfo.pResBlock->pDataBlock, pSup->slotId[0]); int32_t type = pReader->status.pPrimaryTsCol->info.type; if (type != TSDB_DATA_TYPE_TIMESTAMP) { tsdbError("the first column isn't primary timestamp in result block, actual: %s, %s", tDataTypes[type].name, @@ -1221,7 +1235,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { SBlockData* pBlockData = &pStatus->fileBlockData; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SDataBlk* pBlock = getCurrentBlock(pBlockIter); - SSDataBlock* pResBlock = pReader->pResBlock; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; int32_t numOfOutputCols = pSupInfo->numOfCols; int32_t code = TSDB_CODE_SUCCESS; @@ -1269,8 +1283,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { endIndex += step; int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex); - if (dumpedRows > pReader->capacity) { // output buffer check - dumpedRows = pReader->capacity; + if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check + dumpedRows = pReader->resBlockInfo.capacity; } int32_t i = 0; @@ -1785,7 +1799,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast); } - pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity; + pInfo->moreThanCapcity = pBlock->nRow > pReader->resBlockInfo.capacity; pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock); pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange); } @@ -1832,10 +1846,10 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* return TSDB_CODE_SUCCESS; } - SSDataBlock* pBlock = pReader->pResBlock; + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; int64_t st = taosGetTimestampUs(); - int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader); + int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]); pBlock->info.id.uid = pBlockScanInfo->uid; @@ -1866,7 +1880,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; if (nextKey != key) { // merge is not needed - code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); + code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); if (code) { return code; } @@ -1913,7 +1927,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas if (hasVal) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 != ts) { - code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); + code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); if (code) { return code; } @@ -1922,7 +1936,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas return code; } } else { - code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); + code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); if (code) { return code; } @@ -2120,7 +2134,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); @@ -2170,7 +2184,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return code; } - code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); @@ -2197,7 +2211,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return code; } - code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); @@ -2257,7 +2271,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader return code; } - code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); @@ -2475,7 +2489,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); @@ -2658,7 +2672,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc return code; } - code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); @@ -2740,7 +2754,7 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock } static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) { - SSDataBlock* pResBlock = pReader->pResBlock; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; pResBlock->info.dataLoad = 1; @@ -2755,7 +2769,7 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlock static int32_t buildComposedDataBlock(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pResBlock = pReader->pResBlock; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; @@ -2777,7 +2791,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { // it is a clean block, load it directly if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && - pBlock->nRow <= pReader->capacity) { + pBlock->nRow <= pReader->resBlockInfo.capacity) { if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) { code = copyBlockDataToSDataBlock(pReader); if (code) { @@ -3064,7 +3078,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { return TSDB_CODE_SUCCESS; } - SSDataBlock* pResBlock = pReader->pResBlock; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; while (1) { // load the last data block of current table @@ -3098,7 +3112,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { return code; } - if (pResBlock->info.rows >= pReader->capacity) { + if (pResBlock->info.rows >= pReader->resBlockInfo.capacity) { break; } } @@ -3164,7 +3178,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SBlockData* pBData = &pReader->status.fileBlockData; tBlockDataReset(pBData); - SSDataBlock* pResBlock = pReader->pResBlock; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr); int64_t st = taosGetTimestampUs(); @@ -3182,7 +3196,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return code; } - if (pResBlock->info.rows >= pReader->capacity) { + if (pResBlock->info.rows >= pReader->resBlockInfo.capacity) { break; } } @@ -3197,7 +3211,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pResBlock->info.rows, el, pReader->idStr); } } else { // whole block is required, return it directly - SDataBlockInfo* pInfo = &pReader->pResBlock->info; + SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; pInfo->rows = pBlock->nRow; pInfo->id.uid = pScanInfo->uid; pInfo->dataLoad = 0; @@ -3373,7 +3387,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { return code; } - if (pReader->pResBlock->info.rows > 0) { + if (pReader->resBlockInfo.pResBlock->info.rows > 0) { return TSDB_CODE_SUCCESS; } @@ -3456,7 +3470,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { return code; } - if (pReader->pResBlock->info.rows > 0) { + if (pReader->resBlockInfo.pResBlock->info.rows > 0) { return TSDB_CODE_SUCCESS; } @@ -3481,7 +3495,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { return code; } - if (pReader->pResBlock->info.rows > 0) { + if (pReader->resBlockInfo.pResBlock->info.rows > 0) { return TSDB_CODE_SUCCESS; } } @@ -3534,7 +3548,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { return code; } - if (pReader->pResBlock->info.rows > 0) { + if (pReader->resBlockInfo.pResBlock->info.rows > 0) { return TSDB_CODE_SUCCESS; } } @@ -4173,7 +4187,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader) { - SSDataBlock* pBlock = pReader->pResBlock; + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; int32_t code = TSDB_CODE_SUCCESS; do { @@ -4281,7 +4295,7 @@ void* tsdbGetIvtIdx(SMeta* pMeta) { return metaGetIvtIdx(pMeta); } -uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; } +uint64_t tsdbGetReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; } static int32_t doOpenReaderImpl(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; @@ -4484,8 +4498,8 @@ void tsdbReaderClose(STsdbReader* pReader) { } } - if (pReader->freeBlock) { - pReader->pResBlock = blockDataDestroy(pReader->pResBlock); + if (pReader->resBlockInfo.freeBlock) { + pReader->resBlockInfo.pResBlock = blockDataDestroy(pReader->resBlockInfo.pResBlock); } taosMemoryFree(pSupInfo->colId); @@ -4622,7 +4636,7 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter; if (pBlockScanInfo) { // save lastKey to restore memory iterator - STimeWindow w = pReader->pResBlock->info.window; + STimeWindow w = pReader->resBlockInfo.pResBlock->info.window; pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey; // reset current current table's data block scan info, @@ -4707,10 +4721,10 @@ int32_t tsdbReaderResume(STsdbReader* pReader) { STsdbReader* pNextReader = pReader->innerReader[1]; // we need only one row - pPrevReader->capacity = 1; + pPrevReader->resBlockInfo.capacity = 1; setSharedPtr(pPrevReader, pReader); - pNextReader->capacity = 1; + pNextReader->resBlockInfo.capacity = 1; setSharedPtr(pNextReader, pReader); code = doOpenReaderImpl(pPrevReader); @@ -4733,7 +4747,7 @@ _err: static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pBlock = pReader->pResBlock; + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; if (pReader->status.loadFromFile == false) { return false; @@ -4762,7 +4776,7 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { int32_t code = TSDB_CODE_SUCCESS; // cleanup the data that belongs to the previous data block - SSDataBlock* pBlock = pReader->pResBlock; + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; blockDataCleanup(pBlock); *hasNext = false; @@ -4947,7 +4961,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SBlockLoadSuppInfo* pSup = &pReader->suppInfo; - if (pReader->pResBlock->info.id.uid != pFBlock->uid) { + if (pReader->resBlockInfo.pResBlock->info.id.uid != pFBlock->uid) { return TSDB_CODE_SUCCESS; } @@ -4973,8 +4987,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, pTsAgg->numOfNull = 0; pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID; - pTsAgg->min = pReader->pResBlock->info.window.skey; - pTsAgg->max = pReader->pResBlock->info.window.ekey; + pTsAgg->min = pReader->resBlockInfo.pResBlock->info.window.skey; + pTsAgg->max = pReader->resBlockInfo.pResBlock->info.window.ekey; // update the number of NULL data rows size_t numOfCols = pSup->numOfCols; @@ -4985,7 +4999,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, taosArrayEnsureCap(pSup->pColAgg, colsNum); } - SSDataBlock* pResBlock = pReader->pResBlock; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; if (pResBlock->pBlockAgg == NULL) { size_t num = taosArrayGetSize(pResBlock->pDataBlock); pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES); @@ -5056,7 +5070,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { return NULL; } - return pReader->pResBlock; + return pReader->resBlockInfo.pResBlock; } SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { @@ -5071,7 +5085,7 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { SReaderStatus* pStatus = &pTReader->status; if (pStatus->composedDataBlock) { - return pTReader->pResBlock; + return pTReader->resBlockInfo.pResBlock; } SSDataBlock* ret = doRetrieveDataBlock(pTReader);