Merge pull request #15421 from taosdata/feature/3_liaohj
refactor: optimize the memory consumption in tsdbread
This commit is contained in:
commit
97e3946a98
|
@ -371,8 +371,8 @@ struct SBlockIdx {
|
|||
|
||||
struct SMapData {
|
||||
int32_t nItem;
|
||||
int32_t *aOffset;
|
||||
int32_t nData;
|
||||
int32_t *aOffset;
|
||||
uint8_t *pData;
|
||||
};
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ typedef struct {
|
|||
typedef struct STableBlockScanInfo {
|
||||
uint64_t uid;
|
||||
TSKEY lastKey;
|
||||
SBlockIdx blockIdx;
|
||||
SMapData mapData; // block info (compressed)
|
||||
SArray* pBlockList; // block data index list
|
||||
SIterInfo iter; // mem buffer skip list iterator
|
||||
SIterInfo iiter; // imem buffer skip list iterator
|
||||
|
@ -42,7 +42,7 @@ typedef struct STableBlockScanInfo {
|
|||
|
||||
typedef struct SBlockOrderWrapper {
|
||||
int64_t uid;
|
||||
SBlock* pBlock;
|
||||
int64_t offset;
|
||||
} SBlockOrderWrapper;
|
||||
|
||||
typedef struct SBlockOrderSupporter {
|
||||
|
@ -53,11 +53,13 @@ typedef struct SBlockOrderSupporter {
|
|||
} SBlockOrderSupporter;
|
||||
|
||||
typedef struct SIOCostSummary {
|
||||
int64_t blockLoadTime;
|
||||
int64_t smaLoadTime;
|
||||
int64_t checkForNextTime;
|
||||
int64_t numOfBlocks;
|
||||
double blockLoadTime;
|
||||
double buildmemBlock;
|
||||
int64_t headFileLoad;
|
||||
int64_t headFileLoadTime;
|
||||
double headFileLoadTime;
|
||||
int64_t smaData;
|
||||
double smaLoadTime;
|
||||
} SIOCostSummary;
|
||||
|
||||
typedef struct SBlockLoadSuppInfo {
|
||||
|
@ -86,6 +88,8 @@ typedef struct SDataBlockIter {
|
|||
int32_t index;
|
||||
SArray* blockList; // SArray<SFileDataBlockInfo>
|
||||
int32_t order;
|
||||
SBlock block; // current SBlock data
|
||||
SHashObj* pTableMap;
|
||||
} SDataBlockIter;
|
||||
|
||||
typedef struct SFileBlockDumpInfo {
|
||||
|
@ -183,7 +187,7 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
|||
|
||||
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
|
||||
// allocate buffer in order to load data blocks from file
|
||||
// todo use simple hash instead
|
||||
// todo use simple hash instead, optimize the memory consumption
|
||||
SHashObj* pTableMap =
|
||||
taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
if (pTableMap == NULL) {
|
||||
|
@ -244,6 +248,7 @@ static void destroyBlockScanInfo(SHashObj* pTableMap) {
|
|||
|
||||
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
||||
p->pBlockList = taosArrayDestroy(p->pBlockList);
|
||||
tMapDataClear(&p->mapData);
|
||||
}
|
||||
|
||||
taosHashCleanup(pTableMap);
|
||||
|
@ -320,6 +325,8 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
pReader->cost.headFileLoad += 1;
|
||||
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
|
||||
|
||||
|
@ -347,7 +354,7 @@ _err:
|
|||
return false;
|
||||
}
|
||||
|
||||
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
|
||||
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
|
||||
pIter->order = order;
|
||||
pIter->index = -1;
|
||||
pIter->numOfBlocks = -1;
|
||||
|
@ -356,6 +363,7 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
|
|||
} else {
|
||||
taosArrayClear(pIter->blockList);
|
||||
}
|
||||
pIter->pTableMap = pTableMap;
|
||||
}
|
||||
|
||||
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
|
||||
|
@ -521,7 +529,7 @@ _end:
|
|||
// }
|
||||
|
||||
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
|
||||
SArray* aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
||||
SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL);
|
||||
|
@ -554,16 +562,18 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
|
|||
|
||||
STableBlockScanInfo* pScanInfo = p;
|
||||
if (pScanInfo->pBlockList == NULL) {
|
||||
pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock));
|
||||
pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
|
||||
}
|
||||
|
||||
pScanInfo->blockIdx = *pBlockIdx;
|
||||
taosArrayPush(pIndexList, pBlockIdx);
|
||||
}
|
||||
|
||||
int64_t et2 = taosGetTimestampUs();
|
||||
tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%d bytes %s",
|
||||
(int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx), pReader->idStr);
|
||||
tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
|
||||
(int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx)/1024.0, pReader->idStr);
|
||||
|
||||
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
|
||||
|
||||
_end:
|
||||
taosArrayDestroy(aBlockIdx);
|
||||
return code;
|
||||
|
@ -584,23 +594,22 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
|
|||
break;
|
||||
}
|
||||
|
||||
tMapDataClear(&px->mapData);
|
||||
taosArrayClear(px->pBlockList);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
|
||||
|
||||
SMapData mapData = {0};
|
||||
tMapDataReset(&mapData);
|
||||
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL);
|
||||
|
||||
size += mapData.nData;
|
||||
|
||||
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
|
||||
for (int32_t j = 0; j < mapData.nItem; ++j) {
|
||||
SBlock block = {0};
|
||||
|
||||
tMapDataGetItemByIdx(&mapData, j, &block, tGetBlock);
|
||||
tMapDataReset(&pScanInfo->mapData);
|
||||
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData, NULL);
|
||||
|
||||
size += pScanInfo->mapData.nData;
|
||||
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
|
||||
SBlock block = {0};
|
||||
tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock);
|
||||
|
||||
// 1. time range check
|
||||
if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
|
||||
|
@ -612,24 +621,26 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
|
|||
continue;
|
||||
}
|
||||
|
||||
void* p = taosArrayPush(pScanInfo->pBlockList, &block);
|
||||
void* p = taosArrayPush(pScanInfo->pBlockList, &j);
|
||||
if (p == NULL) {
|
||||
tMapDataClear(&mapData);
|
||||
tMapDataClear(&pScanInfo->mapData);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
(*numOfBlocks) += 1;
|
||||
}
|
||||
|
||||
tMapDataClear(&mapData);
|
||||
if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
|
||||
(*numOfValidTables) += 1;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
double el = (taosGetTimestampUs() - st)/1000.0;
|
||||
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s",
|
||||
numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, (et-st)/1000.0, pReader->idStr);
|
||||
numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, el, pReader->idStr);
|
||||
|
||||
pReader->cost.numOfBlocks += (*numOfBlocks);
|
||||
pReader->cost.headFileLoadTime += el;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -657,13 +668,22 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
|
|||
}
|
||||
}
|
||||
|
||||
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
|
||||
SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
|
||||
return pFBlockInfo;
|
||||
}
|
||||
|
||||
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) {
|
||||
return &pBlockIter->block;
|
||||
}
|
||||
|
||||
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
|
||||
SBlockData* pBlockData = &pStatus->fileBlockData;
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
|
||||
|
||||
|
@ -729,12 +749,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
|||
|
||||
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
|
||||
|
||||
int64_t elapsedTime = (taosGetTimestampUs() - st);
|
||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||
pReader->cost.blockLoadTime += elapsedTime;
|
||||
|
||||
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
||||
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
|
||||
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
||||
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
|
||||
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
|
||||
|
||||
|
@ -746,27 +766,30 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
|||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||
|
||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
|
||||
|
||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
||||
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
|
||||
SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
|
||||
int32_t code = tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols,
|
||||
pBlockData, NULL, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
int64_t elapsedTime = (taosGetTimestampUs() - st);
|
||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||
pReader->cost.blockLoadTime += elapsedTime;
|
||||
|
||||
pDumpInfo->allDumped = false;
|
||||
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
|
||||
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
||||
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
|
||||
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_error:
|
||||
|
@ -824,7 +847,21 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
|
|||
SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
|
||||
SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
|
||||
|
||||
return pLeftBlock->pBlock->aSubBlock[0].offset > pRightBlock->pBlock->aSubBlock[0].offset ? 1 : -1;
|
||||
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
|
||||
}
|
||||
|
||||
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
|
||||
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);
|
||||
|
||||
#if 0
|
||||
qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
|
||||
#endif
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
|
||||
|
@ -867,10 +904,15 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
|||
}
|
||||
|
||||
sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
|
||||
SBlock block = {0};
|
||||
for (int32_t k = 0; k < num; ++k) {
|
||||
SBlockOrderWrapper wrapper = {0};
|
||||
wrapper.pBlock = (SBlock*)taosArrayGet(pTableScanInfo->pBlockList, k);
|
||||
|
||||
int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
|
||||
tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock);
|
||||
|
||||
wrapper.uid = pTableScanInfo->uid;
|
||||
wrapper.offset = block.aSubBlock[0].offset;
|
||||
|
||||
sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
|
||||
cnt++;
|
||||
|
@ -894,6 +936,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
|||
|
||||
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
|
||||
cleanupBlockOrderSupporter(&sup);
|
||||
doSetCurrentBlock(pBlockIter);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -932,6 +975,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
|||
taosMemoryFree(pTree);
|
||||
|
||||
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
|
||||
doSetCurrentBlock(pBlockIter);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -944,6 +989,8 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
|
|||
}
|
||||
|
||||
pBlockIter->index += step;
|
||||
doSetCurrentBlock(pBlockIter);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -957,11 +1004,6 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
|
|||
(pVerRange->maxVer < pBlock->maxVersion && pVerRange->maxVer >= pBlock->minVersion);
|
||||
}
|
||||
|
||||
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
|
||||
SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
|
||||
return pFBlockInfo;
|
||||
}
|
||||
|
||||
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
|
||||
int32_t* nextIndex, int32_t order) {
|
||||
bool asc = ASCENDING_TRAVERSE(order);
|
||||
|
@ -974,10 +1016,13 @@ static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STab
|
|||
}
|
||||
|
||||
int32_t step = asc ? 1 : -1;
|
||||
|
||||
*nextIndex = pFBlockInfo->tbBlockIdx + step;
|
||||
SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||
return pNext;
|
||||
|
||||
SBlock *pBlock = taosMemoryCalloc(1, sizeof(SBlock));
|
||||
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||
|
||||
tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
|
||||
|
@ -1015,6 +1060,7 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t
|
|||
ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
|
||||
}
|
||||
|
||||
doSetCurrentBlock(pBlockIter);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1117,6 +1163,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
|
|||
bool overlapWithNeighbor = false;
|
||||
if (pNeighbor) {
|
||||
overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
|
||||
taosMemoryFree(pNeighbor);
|
||||
}
|
||||
|
||||
// has duplicated ts of different version in this block
|
||||
|
@ -1142,11 +1189,13 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
|||
|
||||
setComposedBlockFlag(pReader, true);
|
||||
|
||||
int64_t elapsedTime = taosGetTimestampUs() - st;
|
||||
tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64
|
||||
" us, numOfRows:%d, numOfCols:%d, brange: %" PRId64 " - %" PRId64 " %s",
|
||||
pReader, elapsedTime, pBlock->info.rows, (int32_t)blockDataGetNumOfCols(pBlock), pBlock->info.window.skey,
|
||||
pBlock->info.window.ekey, pReader->idStr);
|
||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug(
|
||||
"%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64
|
||||
" - %" PRId64 " %s",
|
||||
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader->idStr);
|
||||
|
||||
pReader->cost.buildmemBlock += elapsedTime;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1362,7 +1411,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
|
||||
SRowMerger merge = {0};
|
||||
STSRow* pTSRow = NULL;
|
||||
|
||||
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
||||
|
@ -1384,6 +1432,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
|
||||
// imem & mem are all empty, only file exist
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
||||
SRowMerger merge = {0};
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
|
@ -1408,9 +1458,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
|
|||
if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
|
||||
pDumpInfo->rowIndex += step;
|
||||
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
|
||||
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
|
||||
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
|
||||
break;
|
||||
|
@ -1421,9 +1469,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
|
||||
buildComposedDataBlockImpl(pReader, pBlockScanInfo);
|
||||
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
|
||||
// currently loaded file data block is consumed
|
||||
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
|
||||
|
@ -1666,7 +1712,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
|
||||
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||
|
||||
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
|
||||
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
|
||||
|
@ -1729,9 +1775,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
|||
|
||||
// set the correct start position in case of the first/last file block, according to the query time window
|
||||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
|
@ -2102,6 +2146,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
|
|||
}
|
||||
|
||||
bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
|
||||
taosMemoryFree(pNeighborBlock);
|
||||
|
||||
if (overlap) { // load next block
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
|
@ -2152,7 +2198,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
|||
CHECK_FILEBLOCK_STATE st;
|
||||
|
||||
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
SBlock* pCurrentBlock = taosArrayGet(pScanInfo->pBlockList, pFileBlockInfo->tbBlockIdx);
|
||||
SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
|
||||
if (st == CHECK_FILEBLOCK_QUIT) {
|
||||
break;
|
||||
|
@ -2461,7 +2507,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||
|
||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
|
||||
|
||||
// no data in files, let's try buffer in memory
|
||||
if (pReader->status.fileIter.numOfFiles == 0) {
|
||||
|
@ -2477,7 +2523,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
|
||||
|
||||
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, pPrevReader->idStr);
|
||||
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order);
|
||||
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
|
||||
|
||||
// no data in files, let's try buffer in memory
|
||||
if (pPrevReader->status.fileIter.numOfFiles == 0) {
|
||||
|
@ -2519,6 +2565,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
taosMemoryFree(pSupInfo->buildBuf);
|
||||
|
||||
cleanupDataBlockIterator(&pReader->status.blockIter);
|
||||
|
||||
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
||||
destroyBlockScanInfo(pReader->status.pTableMap);
|
||||
blockDataDestroy(pReader->pResBlock);
|
||||
|
||||
|
@ -2528,10 +2576,12 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
|
||||
SIOCostSummary* pCost = &pReader->cost;
|
||||
|
||||
tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
|
||||
" us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
|
||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaLoadTime, pCost->blockLoadTime,
|
||||
pCost->checkForNextTime, pReader->idStr);
|
||||
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%"PRId64" SMA-time:%.2f ms, "
|
||||
"fileBlocks:%"PRId64", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo "
|
||||
"size:%.2f Kb %s",
|
||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
|
||||
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
|
||||
numOfTables * sizeof(STableBlockScanInfo) /1000.0, pReader->idStr);
|
||||
|
||||
taosMemoryFree(pReader->idStr);
|
||||
taosMemoryFree(pReader->pSchema);
|
||||
|
@ -2543,7 +2593,6 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) {
|
|||
SSDataBlock* pBlock = pReader->pResBlock;
|
||||
blockDataCleanup(pBlock);
|
||||
|
||||
int64_t stime = taosGetTimestampUs();
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
|
@ -2639,9 +2688,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
|||
}
|
||||
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
|
||||
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
int64_t stime = taosGetTimestampUs();
|
||||
|
||||
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
||||
|
@ -2690,12 +2738,13 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
|||
}
|
||||
}
|
||||
|
||||
int64_t elapsed = taosGetTimestampUs() - stime;
|
||||
double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
|
||||
pReader->cost.smaLoadTime += elapsed;
|
||||
pReader->cost.smaData += 1;
|
||||
|
||||
*pBlockStatis = pSup->plist;
|
||||
|
||||
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%" PRId64 "us, %s", 0, pFBlock->uid,
|
||||
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
|
||||
elapsed, pReader->idStr);
|
||||
|
||||
return code;
|
||||
|
@ -2764,7 +2813,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
|
||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
|
||||
resetDataBlockScanInfo(pReader->status.pTableMap);
|
||||
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -24,6 +24,8 @@ void tMapDataReset(SMapData *pMapData) {
|
|||
void tMapDataClear(SMapData *pMapData) {
|
||||
tFree((uint8_t *)pMapData->aOffset);
|
||||
tFree(pMapData->pData);
|
||||
pMapData->pData = NULL;
|
||||
pMapData->aOffset = NULL;
|
||||
}
|
||||
|
||||
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
|
||||
|
|
|
@ -1616,6 +1616,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
|
||||
}
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated,
|
||||
pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
||||
|
@ -1628,6 +1629,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
printDataBlock(pInfo->binfo.pRes, "single interval");
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
|
|
Loading…
Reference in New Issue