fix(query)[TD-32259]. Fix error handling in merge sort

- Ensure proper release of allocated datablocks on error
- Address potential double free issue
This commit is contained in:
Jinqing Kuang 2024-09-23 14:21:29 +08:00
parent b6ede06707
commit 7f3c6c5e97
1 changed files with 34 additions and 21 deletions

View File

@ -437,41 +437,36 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSources, SSDataBlock* pBlock,
int32_t* sourceId, SArray* pPageIdList) {
int32_t code = 0;
int32_t lino = 0;
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) {
taosArrayDestroy(pPageIdList);
return terrno;
}
QUERY_CHECK_NULL(pSource, code, lino, _err, terrno);
pSource->src.pBlock = pBlock;
pSource->pageIdList = pPageIdList;
void* p = taosArrayPush(pAllSources, &pSource);
if (p == NULL) {
taosArrayDestroy(pPageIdList);
return terrno;
}
SSortSource** p = taosArrayPush(pAllSources, &pSource);
QUERY_CHECK_NULL(p, code, lino, _err, terrno);
pSource = NULL;
(*sourceId) += 1;
int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
int32_t rowSize = blockDataGetSerialRowSize((*p)->src.pBlock);
// The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
int32_t numOfRows =
(getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
if (numOfRows <= 0) {
qError("sort failed at: %s:%d", __func__, __LINE__);
taosArrayDestroy(pPageIdList);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
QUERY_CHECK_CONDITION((numOfRows > 0), code, lino, _err, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
int32_t code = blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
if (code != 0) {
qError("sort failed at: %s:%d", __func__, __LINE__);
taosArrayDestroy(pPageIdList);
}
code = blockDataEnsureCapacity((*p)->src.pBlock, numOfRows);
QUERY_CHECK_CODE(code, lino, _err);
return code;
_err:
if (pSource) taosMemoryFree(pSource);
qError("sort failed at %s:%d since %s", __func__, lino, tstrerror(code));
return code;
}
static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
@ -554,7 +549,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
return code;
}
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
if (code) {
blockDataDestroy(pBlock);
taosArrayDestroy(pPageIdList);
}
return code;
}
static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
@ -1023,6 +1023,9 @@ static int32_t doSortForEachGroup(SSortHandle* pHandle, int32_t sortTimes, int32
QUERY_CHECK_CODE(code, lino, _err);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
if (code != TSDB_CODE_SUCCESS) {
blockDataDestroy(pBlock);
}
QUERY_CHECK_CODE(code, lino, _err);
}
@ -2144,6 +2147,10 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
if (code) goto _error;
code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
if (code != TSDB_CODE_SUCCESS) {
blockDataDestroy(pMemSrcBlk);
goto _error;
}
cleanupMergeSup(&sup);
tMergeTreeDestroy(&pTree);
@ -2306,9 +2313,15 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
blockDataDestroy(tBlk);
}
QUERY_CHECK_CODE(code, lino, _err);
void* px = taosArrayPush(aBlkSort, &tBlk);
if (px == NULL) {
blockDataDestroy(tBlk);
}
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
}
}