fix(query):fix memory leak.
This commit is contained in:
parent
6c355184a9
commit
aeed234b2e
|
@ -431,6 +431,12 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
|||
|
||||
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
|
||||
code = tBlockDataInit(&pReader->status.fileBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pReader->pResBlock = createResBlock(pCond, pReader->capacity);
|
||||
if (pReader->pResBlock == NULL) {
|
||||
code = terrno;
|
||||
|
@ -1200,8 +1206,9 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
|
||||
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
|
||||
STSRow* pTSRow, SIterInfo* pIter, int64_t key) {
|
||||
SIterInfo* pIter, int64_t key) {
|
||||
SRowMerger merge = {0};
|
||||
STSRow* pTSRow = NULL;
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
||||
|
@ -1250,6 +1257,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
|
||||
tRowMergerClear(&merge);
|
||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
||||
|
||||
taosMemoryFree(pTSRow);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1411,8 +1420,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
|
||||
STSRow* pTSRow = NULL;
|
||||
|
||||
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
||||
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
||||
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
|
||||
|
@ -1422,23 +1429,27 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
} else {
|
||||
// imem + file
|
||||
if (pBlockScanInfo->iiter.hasVal) {
|
||||
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, &pBlockScanInfo->iiter, key);
|
||||
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key);
|
||||
}
|
||||
|
||||
// mem + file
|
||||
if (pBlockScanInfo->iter.hasVal) {
|
||||
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, &pBlockScanInfo->iter, key);
|
||||
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key);
|
||||
}
|
||||
|
||||
// imem & mem are all empty, only file exist
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
||||
STSRow* pTSRow = NULL;
|
||||
SRowMerger merge = {0};
|
||||
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
||||
|
||||
taosMemoryFree(pTSRow);
|
||||
tRowMergerClear(&merge);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
@ -1716,7 +1727,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
|
||||
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
|
||||
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
|
||||
tBlockDataInit(&pStatus->fileBlockData);
|
||||
tBlockDataReset(&pStatus->fileBlockData);
|
||||
tBlockDataClearData(&pStatus->fileBlockData);
|
||||
code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2160,6 +2172,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
|
|||
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
|
||||
|
||||
// 3. load the neighbor block, and set it to be the currently accessed file data block
|
||||
tBlockDataReset(&pStatus->fileBlockData);
|
||||
tBlockDataClearData(&pStatus->fileBlockData);
|
||||
int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2563,6 +2577,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
}
|
||||
}
|
||||
taosMemoryFree(pSupInfo->buildBuf);
|
||||
tBlockDataClear(&pReader->status.fileBlockData, true);
|
||||
|
||||
cleanupDataBlockIterator(&pReader->status.blockIter);
|
||||
|
||||
|
@ -2760,13 +2775,9 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
|
|||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
|
||||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
|
||||
int32_t code = tBlockDataInit(&pStatus->fileBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
|
||||
tBlockDataReset(&pStatus->fileBlockData);
|
||||
tBlockDataClearData(&pStatus->fileBlockData);
|
||||
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tBlockDataClear(&pStatus->fileBlockData, 1);
|
||||
|
||||
|
@ -2775,7 +2786,6 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
|
||||
tBlockDataClear(&pStatus->fileBlockData, 1);
|
||||
return pReader->pResBlock->pDataBlock;
|
||||
}
|
||||
|
||||
|
@ -2872,9 +2882,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
|||
|
||||
while (true) {
|
||||
if (hasNext) {
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||
|
||||
int32_t numOfRows = pBlock->nRow;
|
||||
pTableBlockInfo->totalRows += numOfRows;
|
||||
|
@ -2895,7 +2903,6 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
|||
pTableBlockInfo->blockRowsHisto[bucketIndex]++;
|
||||
|
||||
hasNext = blockIteratorNext(&pStatus->blockIter);
|
||||
|
||||
} else {
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
|
||||
|
|
|
@ -1022,6 +1022,10 @@ void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) {
|
|||
tFree((uint8_t *)pBlockData->aTSKEY);
|
||||
taosArrayDestroy(pBlockData->aIdx);
|
||||
taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL);
|
||||
pBlockData->aColData = NULL;
|
||||
pBlockData->aIdx = NULL;
|
||||
pBlockData->aTSKEY = NULL;
|
||||
pBlockData->aVersion = NULL;
|
||||
}
|
||||
|
||||
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) {
|
||||
|
|
Loading…
Reference in New Issue