refactor: opt memory consumption for tsdbread.
This commit is contained in:
parent
90b3e77698
commit
231f439979
|
@ -32,6 +32,7 @@ typedef struct STableBlockScanInfo {
|
|||
uint64_t uid;
|
||||
TSKEY lastKey;
|
||||
SBlockIdx blockIdx;
|
||||
SMapData mapData; // block info (compressed)
|
||||
SArray* pBlockList; // block data index list
|
||||
SIterInfo iter; // mem buffer skip list iterator
|
||||
SIterInfo iiter; // imem buffer skip list iterator
|
||||
|
@ -42,7 +43,7 @@ typedef struct STableBlockScanInfo {
|
|||
|
||||
typedef struct SBlockOrderWrapper {
|
||||
int64_t uid;
|
||||
SBlock* pBlock;
|
||||
int64_t offset;
|
||||
} SBlockOrderWrapper;
|
||||
|
||||
typedef struct SBlockOrderSupporter {
|
||||
|
@ -53,11 +54,13 @@ typedef struct SBlockOrderSupporter {
|
|||
} SBlockOrderSupporter;
|
||||
|
||||
typedef struct SIOCostSummary {
|
||||
int64_t blockLoadTime;
|
||||
int64_t smaLoadTime;
|
||||
int64_t checkForNextTime;
|
||||
int64_t numOfBlocks;
|
||||
double blockLoadTime;
|
||||
double buildmemBlock;
|
||||
int64_t headFileLoad;
|
||||
int64_t headFileLoadTime;
|
||||
double headFileLoadTime;
|
||||
int64_t smaData;
|
||||
double smaLoadTime;
|
||||
} SIOCostSummary;
|
||||
|
||||
typedef struct SBlockLoadSuppInfo {
|
||||
|
@ -86,6 +89,8 @@ typedef struct SDataBlockIter {
|
|||
int32_t index;
|
||||
SArray* blockList; // SArray<SFileDataBlockInfo>
|
||||
int32_t order;
|
||||
SBlock block; // current SBlock data
|
||||
SHashObj* pTableMap;
|
||||
} SDataBlockIter;
|
||||
|
||||
typedef struct SFileBlockDumpInfo {
|
||||
|
@ -320,6 +325,8 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
pReader->cost.headFileLoad += 1;
|
||||
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
|
||||
|
||||
|
@ -347,7 +354,7 @@ _err:
|
|||
return false;
|
||||
}
|
||||
|
||||
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
|
||||
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
|
||||
pIter->order = order;
|
||||
pIter->index = -1;
|
||||
pIter->numOfBlocks = -1;
|
||||
|
@ -356,6 +363,7 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
|
|||
} else {
|
||||
taosArrayClear(pIter->blockList);
|
||||
}
|
||||
pIter->pTableMap = pTableMap;
|
||||
}
|
||||
|
||||
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
|
||||
|
@ -554,7 +562,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
|
|||
|
||||
STableBlockScanInfo* pScanInfo = p;
|
||||
if (pScanInfo->pBlockList == NULL) {
|
||||
pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock));
|
||||
pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
|
||||
}
|
||||
|
||||
pScanInfo->blockIdx = *pBlockIdx;
|
||||
|
@ -562,8 +570,11 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
|
|||
}
|
||||
|
||||
int64_t et2 = taosGetTimestampUs();
|
||||
tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%d bytes %s",
|
||||
(int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx), pReader->idStr);
|
||||
tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
|
||||
(int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx)/1024.0, pReader->idStr);
|
||||
|
||||
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
|
||||
|
||||
_end:
|
||||
taosArrayDestroy(aBlockIdx);
|
||||
return code;
|
||||
|
@ -584,23 +595,22 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
|
|||
break;
|
||||
}
|
||||
|
||||
tMapDataClear(&px->mapData);
|
||||
taosArrayClear(px->pBlockList);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
|
||||
|
||||
SMapData mapData = {0};
|
||||
tMapDataReset(&mapData);
|
||||
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL);
|
||||
|
||||
size += mapData.nData;
|
||||
|
||||
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
|
||||
for (int32_t j = 0; j < mapData.nItem; ++j) {
|
||||
SBlock block = {0};
|
||||
|
||||
tMapDataGetItemByIdx(&mapData, j, &block, tGetBlock);
|
||||
tMapDataReset(&pScanInfo->mapData);
|
||||
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData, NULL);
|
||||
|
||||
size += pScanInfo->mapData.nData;
|
||||
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
|
||||
SBlock block = {0};
|
||||
tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock);
|
||||
|
||||
// 1. time range check
|
||||
if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
|
||||
|
@ -612,24 +622,26 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
|
|||
continue;
|
||||
}
|
||||
|
||||
void* p = taosArrayPush(pScanInfo->pBlockList, &block);
|
||||
void* p = taosArrayPush(pScanInfo->pBlockList, &j);
|
||||
if (p == NULL) {
|
||||
tMapDataClear(&mapData);
|
||||
tMapDataClear(&pScanInfo->mapData);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
(*numOfBlocks) += 1;
|
||||
}
|
||||
|
||||
tMapDataClear(&mapData);
|
||||
if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
|
||||
(*numOfValidTables) += 1;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
double el = (taosGetTimestampUs() - st)/1000.0;
|
||||
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s",
|
||||
numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, (et-st)/1000.0, pReader->idStr);
|
||||
numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, el, pReader->idStr);
|
||||
|
||||
pReader->cost.numOfBlocks += (*numOfBlocks);
|
||||
pReader->cost.headFileLoadTime += el;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -657,13 +669,22 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
|
|||
}
|
||||
}
|
||||
|
||||
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
|
||||
SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
|
||||
return pFBlockInfo;
|
||||
}
|
||||
|
||||
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) {
|
||||
return &pBlockIter->block;
|
||||
}
|
||||
|
||||
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
|
||||
SBlockData* pBlockData = &pStatus->fileBlockData;
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
|
||||
|
||||
|
@ -729,12 +750,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
|||
|
||||
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
|
||||
|
||||
int64_t elapsedTime = (taosGetTimestampUs() - st);
|
||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||
pReader->cost.blockLoadTime += elapsedTime;
|
||||
|
||||
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
||||
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
|
||||
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
||||
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
|
||||
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
|
||||
|
||||
|
@ -746,7 +767,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
|||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||
|
||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
|
||||
|
||||
|
@ -759,14 +781,15 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int64_t elapsedTime = (taosGetTimestampUs() - st);
|
||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||
pReader->cost.blockLoadTime += elapsedTime;
|
||||
|
||||
pDumpInfo->allDumped = false;
|
||||
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
|
||||
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
||||
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
|
||||
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_error:
|
||||
|
@ -824,7 +847,21 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
|
|||
SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
|
||||
SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
|
||||
|
||||
return pLeftBlock->pBlock->aSubBlock[0].offset > pRightBlock->pBlock->aSubBlock[0].offset ? 1 : -1;
|
||||
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
|
||||
}
|
||||
|
||||
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
|
||||
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);
|
||||
|
||||
#if 0
|
||||
qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
|
||||
#endif
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
|
||||
|
@ -869,8 +906,13 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
|||
sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
|
||||
for (int32_t k = 0; k < num; ++k) {
|
||||
SBlockOrderWrapper wrapper = {0};
|
||||
wrapper.pBlock = (SBlock*)taosArrayGet(pTableScanInfo->pBlockList, k);
|
||||
|
||||
SBlock block = {0};
|
||||
int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
|
||||
tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock);
|
||||
|
||||
wrapper.uid = pTableScanInfo->uid;
|
||||
wrapper.offset = block.aSubBlock[0].offset;
|
||||
|
||||
sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
|
||||
cnt++;
|
||||
|
@ -894,6 +936,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
|||
|
||||
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
|
||||
cleanupBlockOrderSupporter(&sup);
|
||||
doSetCurrentBlock(pBlockIter);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -932,6 +975,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
|||
taosMemoryFree(pTree);
|
||||
|
||||
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
|
||||
doSetCurrentBlock(pBlockIter);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -944,6 +989,8 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
|
|||
}
|
||||
|
||||
pBlockIter->index += step;
|
||||
doSetCurrentBlock(pBlockIter);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -957,11 +1004,6 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
|
|||
(pVerRange->maxVer < pBlock->maxVersion && pVerRange->maxVer >= pBlock->minVersion);
|
||||
}
|
||||
|
||||
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
|
||||
SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
|
||||
return pFBlockInfo;
|
||||
}
|
||||
|
||||
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
|
||||
int32_t* nextIndex, int32_t order) {
|
||||
bool asc = ASCENDING_TRAVERSE(order);
|
||||
|
@ -974,10 +1016,13 @@ static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STab
|
|||
}
|
||||
|
||||
int32_t step = asc ? 1 : -1;
|
||||
|
||||
*nextIndex = pFBlockInfo->tbBlockIdx + step;
|
||||
SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||
return pNext;
|
||||
|
||||
SBlock *pBlock = taosMemoryCalloc(1, sizeof(SBlock));
|
||||
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||
|
||||
tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
|
||||
|
@ -1117,6 +1162,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
|
|||
bool overlapWithNeighbor = false;
|
||||
if (pNeighbor) {
|
||||
overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
|
||||
taosMemoryFree(pNeighbor);
|
||||
}
|
||||
|
||||
// has duplicated ts of different version in this block
|
||||
|
@ -1142,11 +1188,13 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
|||
|
||||
setComposedBlockFlag(pReader, true);
|
||||
|
||||
int64_t elapsedTime = taosGetTimestampUs() - st;
|
||||
tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64
|
||||
" us, numOfRows:%d, numOfCols:%d, brange: %" PRId64 " - %" PRId64 " %s",
|
||||
pReader, elapsedTime, pBlock->info.rows, (int32_t)blockDataGetNumOfCols(pBlock), pBlock->info.window.skey,
|
||||
pBlock->info.window.ekey, pReader->idStr);
|
||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug(
|
||||
"%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64
|
||||
" - %" PRId64 " %s",
|
||||
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader->idStr);
|
||||
|
||||
pReader->cost.buildmemBlock += elapsedTime;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1408,9 +1456,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
|
|||
if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
|
||||
pDumpInfo->rowIndex += step;
|
||||
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
|
||||
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
|
||||
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
|
||||
break;
|
||||
|
@ -1421,9 +1467,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
|
||||
buildComposedDataBlockImpl(pReader, pBlockScanInfo);
|
||||
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
|
||||
// currently loaded file data block is consumed
|
||||
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
|
||||
|
@ -1666,7 +1710,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
|
||||
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||
|
||||
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
|
||||
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
|
||||
|
@ -1729,9 +1773,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
|||
|
||||
// set the correct start position in case of the first/last file block, according to the query time window
|
||||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
|
@ -2102,6 +2144,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
|
|||
}
|
||||
|
||||
bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
|
||||
taosMemoryFree(pNeighborBlock);
|
||||
|
||||
if (overlap) { // load next block
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
|
@ -2152,7 +2196,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
|||
CHECK_FILEBLOCK_STATE st;
|
||||
|
||||
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
SBlock* pCurrentBlock = taosArrayGet(pScanInfo->pBlockList, pFileBlockInfo->tbBlockIdx);
|
||||
SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
|
||||
if (st == CHECK_FILEBLOCK_QUIT) {
|
||||
break;
|
||||
|
@ -2461,7 +2505,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||
|
||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
|
||||
|
||||
// no data in files, let's try buffer in memory
|
||||
if (pReader->status.fileIter.numOfFiles == 0) {
|
||||
|
@ -2477,7 +2521,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
|
||||
|
||||
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, pPrevReader->idStr);
|
||||
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order);
|
||||
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
|
||||
|
||||
// no data in files, let's try buffer in memory
|
||||
if (pPrevReader->status.fileIter.numOfFiles == 0) {
|
||||
|
@ -2528,10 +2572,10 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
|
||||
SIOCostSummary* pCost = &pReader->cost;
|
||||
|
||||
tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
|
||||
" us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
|
||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaLoadTime, pCost->blockLoadTime,
|
||||
pCost->checkForNextTime, pReader->idStr);
|
||||
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%"PRId64" SMA-time:%.2f ms, "
|
||||
"fileBlocks:%"PRId64", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, %s",
|
||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
|
||||
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pReader->idStr);
|
||||
|
||||
taosMemoryFree(pReader->idStr);
|
||||
taosMemoryFree(pReader->pSchema);
|
||||
|
@ -2543,7 +2587,6 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) {
|
|||
SSDataBlock* pBlock = pReader->pResBlock;
|
||||
blockDataCleanup(pBlock);
|
||||
|
||||
int64_t stime = taosGetTimestampUs();
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
|
@ -2639,9 +2682,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
|||
}
|
||||
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
|
||||
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
int64_t stime = taosGetTimestampUs();
|
||||
|
||||
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
||||
|
@ -2690,12 +2732,13 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
|||
}
|
||||
}
|
||||
|
||||
int64_t elapsed = taosGetTimestampUs() - stime;
|
||||
double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
|
||||
pReader->cost.smaLoadTime += elapsed;
|
||||
pReader->cost.smaData += 1;
|
||||
|
||||
*pBlockStatis = pSup->plist;
|
||||
|
||||
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%" PRId64 "us, %s", 0, pFBlock->uid,
|
||||
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
|
||||
elapsed, pReader->idStr);
|
||||
|
||||
return code;
|
||||
|
@ -2764,7 +2807,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
|
||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
|
||||
resetDataBlockScanInfo(pReader->status.pTableMap);
|
||||
|
||||
int32_t code = 0;
|
||||
|
|
Loading…
Reference in New Issue