fix(query)[TD-32360]. Fix memory leak during tsort in exceptional scenarios
When extracting data from the sort source, an error may cause a long jump to the outermost execution layer. This leads to a local variable in createBlocksQuickSortInitialSources not being released, resulting a memory leak. This fix ensures proper resource cleanup in such cases to prevent leaks.
This commit is contained in:
parent
fb28fc578a
commit
f5c9bf9b5d
|
@ -867,6 +867,7 @@ int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t r
|
|||
|
||||
code = blockDataEnsureCapacity(pDst, rowCount);
|
||||
if (code) {
|
||||
blockDataDestroy(pDst);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -2391,25 +2391,31 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void freeSortSource(SSortSource* pSource) {
|
||||
if (NULL == pSource) {
|
||||
static void freeSortSource(void* p) {
|
||||
SSortSource** pSource = (SSortSource**)p;
|
||||
if (NULL == pSource || NULL == *pSource) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!pSource->onlyRef && pSource->param) {
|
||||
taosMemoryFree(pSource->param);
|
||||
if ((*pSource)->pageIdList) {
|
||||
taosArrayDestroy((*pSource)->pageIdList);
|
||||
}
|
||||
|
||||
if (!pSource->onlyRef && pSource->src.pBlock) {
|
||||
blockDataDestroy(pSource->src.pBlock);
|
||||
pSource->src.pBlock = NULL;
|
||||
if (!(*pSource)->onlyRef) {
|
||||
if ((*pSource)->param) {
|
||||
taosMemoryFree((*pSource)->param);
|
||||
}
|
||||
if ((*pSource)->src.pBlock) {
|
||||
blockDataDestroy((*pSource)->src.pBlock);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pSource);
|
||||
taosMemoryFreeClear(*pSource);
|
||||
}
|
||||
|
||||
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||
if (p == NULL) {
|
||||
|
@ -2417,17 +2423,12 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
SSortSource* pSource = *p;
|
||||
|
||||
taosArrayRemove(pHandle->pOrderedSource, 0);
|
||||
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
size_t origSourceCount = taosArrayGetSize(pHandle->pOrderedSource);
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = NULL;
|
||||
code = pHandle->fetchfp(pSource->param, &pBlock);
|
||||
if (code != 0) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
|
@ -2441,10 +2442,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
pHandle->numOfPages = 1024;
|
||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
|
||||
if (code) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (pHandle->beforeFp != NULL) {
|
||||
|
@ -2452,43 +2450,30 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||
if (size > sortBufSize) {
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t st = taosGetTimestampUs();
|
||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
if (code != 0) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pHandle->sortElapsed += (taosGetTimestampUs() - st);
|
||||
|
||||
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
|
||||
freeSortSource(pSource);
|
||||
|
||||
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
|
||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t st = taosGetTimestampUs();
|
||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
||||
pHandle->sortElapsed += (taosGetTimestampUs() - st);
|
||||
|
@ -2501,12 +2486,16 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
pHandle->loops = 1;
|
||||
pHandle->tupleHandle.rowIndex = -1;
|
||||
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
|
||||
return 0;
|
||||
} else {
|
||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
taosArrayRemoveBatch(pHandle->pOrderedSource, 0, origSourceCount, freeSortSource);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue