Merge pull request #28170 from taosdata/fix/TD-32360-3.0
fix(query)[TD-32360]. Fix memory leak during tsort in exceptional scenarios
This commit is contained in:
commit
ef18811654
|
@ -867,6 +867,7 @@ int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t r
|
||||||
|
|
||||||
code = blockDataEnsureCapacity(pDst, rowCount);
|
code = blockDataEnsureCapacity(pDst, rowCount);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
blockDataDestroy(pDst);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2391,25 +2391,31 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void freeSortSource(SSortSource* pSource) {
|
static void freeSortSource(void* p) {
|
||||||
if (NULL == pSource) {
|
SSortSource** pSource = (SSortSource**)p;
|
||||||
|
if (NULL == pSource || NULL == *pSource) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pSource->onlyRef && pSource->param) {
|
if ((*pSource)->pageIdList) {
|
||||||
taosMemoryFree(pSource->param);
|
taosArrayDestroy((*pSource)->pageIdList);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pSource->onlyRef && pSource->src.pBlock) {
|
if (!(*pSource)->onlyRef) {
|
||||||
blockDataDestroy(pSource->src.pBlock);
|
if ((*pSource)->param) {
|
||||||
pSource->src.pBlock = NULL;
|
taosMemoryFree((*pSource)->param);
|
||||||
|
}
|
||||||
|
if ((*pSource)->src.pBlock) {
|
||||||
|
blockDataDestroy((*pSource)->src.pBlock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pSource);
|
taosMemoryFreeClear(*pSource);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
|
SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
@ -2417,17 +2423,12 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSortSource* pSource = *p;
|
SSortSource* pSource = *p;
|
||||||
|
size_t origSourceCount = taosArrayGetSize(pHandle->pOrderedSource);
|
||||||
taosArrayRemove(pHandle->pOrderedSource, 0);
|
|
||||||
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
code = pHandle->fetchfp(pSource->param, &pBlock);
|
code = pHandle->fetchfp(pSource->param, &pBlock);
|
||||||
if (code != 0) {
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
freeSortSource(pSource);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
|
@ -2441,10 +2442,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||||
pHandle->numOfPages = 1024;
|
pHandle->numOfPages = 1024;
|
||||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
|
code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
|
||||||
if (code) {
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
freeSortSource(pSource);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->beforeFp != NULL) {
|
if (pHandle->beforeFp != NULL) {
|
||||||
|
@ -2452,43 +2450,30 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
freeSortSource(pSource);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||||
if (size > sortBufSize) {
|
if (size > sortBufSize) {
|
||||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||||
if (code != 0) {
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
freeSortSource(pSource);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
pHandle->sortElapsed += (taosGetTimestampUs() - st);
|
pHandle->sortElapsed += (taosGetTimestampUs() - st);
|
||||||
|
|
||||||
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
||||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
freeSortSource(pSource);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
freeSortSource(pSource);
|
|
||||||
|
|
||||||
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
|
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
|
||||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||||
|
|
||||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||||
if (code != 0) {
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
||||||
pHandle->sortElapsed += (taosGetTimestampUs() - st);
|
pHandle->sortElapsed += (taosGetTimestampUs() - st);
|
||||||
|
@ -2501,12 +2486,16 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||||
pHandle->loops = 1;
|
pHandle->loops = 1;
|
||||||
pHandle->tupleHandle.rowIndex = -1;
|
pHandle->tupleHandle.rowIndex = -1;
|
||||||
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
|
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
|
||||||
return 0;
|
|
||||||
} else {
|
} else {
|
||||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
taosArrayRemoveBatch(pHandle->pOrderedSource, 0, origSourceCount, freeSortSource);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue