fix(query): fix memory leak.

This commit is contained in:
Haojun Liao 2024-09-06 17:30:31 +08:00
parent 6af9f279bc
commit 370ba19062
2 changed files with 108 additions and 107 deletions

View File

@ -536,7 +536,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
_end:
(void) tStatisBlockDestroy(&block);
if (code != 0) {
tsdbError("%s error happens at:%s line number: %d, code:%s", id, __FUNCTION__, lino, tstrerror(code));
tsdbError("%s error happens at:%s line number: %d, code:%s", id, __func__, lino, tstrerror(code));
} else {
double el = (taosGetTimestampUs() - st) / 1000.0;
pBlockLoadInfo->cost.statisElapsedTime += el;

View File

@ -465,7 +465,13 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
int32_t code = blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
if (code != 0) {
qError("sort failed at: %s:%d", __func__, __LINE__);
taosArrayDestroy(pPageIdList);
}
return code;
}
static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
@ -935,9 +941,99 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
}
}
}
return 0;
}
static int32_t doSortForEachGroup(SSortHandle* pHandle, int32_t sortTimes, int32_t numOfSorted,
int32_t numOfInputSources, SArray* pResList, int32_t sortGroup, int32_t numOfRows) {
int32_t code = 0;
int32_t lino = 0;
SArray* pPageIdList = NULL;
for (int32_t i = 0; i < sortGroup; ++i) {
qDebug("internal merge sort pass %d group %d. num input sources %d ", sortTimes, i, numOfInputSources);
pHandle->sourceId += 1;
int32_t end = (i + 1) * numOfInputSources - 1;
if (end > numOfSorted - 1) {
end = numOfSorted - 1;
}
pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
QUERY_CHECK_CODE(code, lino, _err);
code =
tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
QUERY_CHECK_CODE(code, lino, _err);
int32_t nMergedRows = 0;
pPageIdList = taosArrayInit(4, sizeof(int32_t));
QUERY_CHECK_NULL(pPageIdList, code, lino, _err, terrno);
while (1) {
if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
code = TSDB_CODE_TSC_QUERY_CANCELLED;
goto _err;
}
SSDataBlock* pDataBlock = NULL;
code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock);
if (pDataBlock == NULL || code != 0) {
break;
}
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
QUERY_CHECK_NULL(pPage, code, lino, _err, terrno);
void* px = taosArrayPush(pPageIdList, &pageId);
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
int32_t size =
blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
if (size > getBufPageSize(pHandle->pBuf)) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _err;
}
code = blockDataToBuf(pPage, pDataBlock);
QUERY_CHECK_CODE(code, lino, _err);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
nMergedRows += pDataBlock->info.rows;
blockDataCleanup(pDataBlock);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
break;
}
}
code = sortComparCleanup(&pHandle->cmpParam);
QUERY_CHECK_CODE(code, lino, _err);
tMergeTreeDestroy(&pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = NULL;
code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
QUERY_CHECK_CODE(code, lino, _err);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
QUERY_CHECK_CODE(code, lino, _err);
}
return code;
_err:
taosArrayDestroy(pPageIdList);
qError("%s error happens:%s line:%d, code:%s", pHandle->idStr, __func__, lino, tstrerror(code));
return code;
}
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
if (numOfSources == 0) {
@ -959,8 +1055,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
pHandle->numOfPages);
}
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize,
blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock)));
int32_t size = (int32_t) blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock));
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, size);
if (numOfRows < 0) {
return terrno;
}
@ -985,117 +1081,22 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;
// Only *numOfInputSources* can be loaded into buffer to perform the external sort.
for (int32_t i = 0; i < sortGroup; ++i) {
qDebug("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources);
pHandle->sourceId += 1;
int32_t end = (i + 1) * numOfInputSources - 1;
if (end > numOfSorted - 1) {
end = numOfSorted - 1;
}
pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
return code;
}
code =
tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
return code;
}
int32_t nMergedRows = 0;
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
if (pPageIdList == NULL) {
taosArrayDestroy(pResList);
return terrno;
}
while (1) {
if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
return code;
}
SSDataBlock* pDataBlock = NULL;
code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock);
if (pDataBlock == NULL || code != 0) {
break;
}
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) {
taosArrayDestroy(pResList);
taosArrayDestroy(pPageIdList);
return terrno;
}
void* px = taosArrayPush(pPageIdList, &pageId);
if (px == NULL) {
taosArrayDestroy(pResList);
taosArrayDestroy(pPageIdList);
return terrno;
}
int32_t size =
blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
if (size > getBufPageSize(pHandle->pBuf)) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
code= blockDataToBuf(pPage, pDataBlock);
if (code) {
return code;
}
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
nMergedRows += pDataBlock->info.rows;
blockDataCleanup(pDataBlock);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
break;
}
}
code = sortComparCleanup(&pHandle->cmpParam);
if (code) {
return code;
}
tMergeTreeDestroy(&pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = NULL;
code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
if (code) {
taosArrayDestroy(pResList);
return code;
}
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList); // this may cause memory leak.
return code;
}
code = doSortForEachGroup(pHandle, t, numOfSorted, numOfInputSources, pResList, sortGroup, numOfRows);
if (code != 0) {
tsortClearOrderedSource(pResList, NULL, NULL);
taosArrayDestroy(pResList);
return code;
}
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
void* px = taosArrayAddAll(pHandle->pOrderedSource, pResList);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
tsortClearOrderedSource(pResList, NULL, NULL);
taosArrayDestroy(pResList);
return terrno;
}
taosArrayDestroy(pResList);
numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
int64_t el = taosGetTimestampUs() - st;