fix(query): fix memory leak.

This commit is contained in:
Haojun Liao 2024-08-30 10:19:16 +08:00
parent 251bdf51c1
commit 5a5bfa5b8c
1 changed files with 21 additions and 25 deletions

View File

@ -263,17 +263,17 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
blockDataCleanup(pDataBlock);
int32_t lino = 0;
int32_t code = 0;
SSDataBlock* p = NULL;
int32_t code = tsortGetSortedDataBlock(pHandle, &p);
code = tsortGetSortedDataBlock(pHandle, &p);
if (p == NULL || (code != 0)) {
return code;
}
code = blockDataEnsureCapacity(p, capacity);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _error);
STupleHandle* pTupleHandle;
while (1) {
@ -282,51 +282,40 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
} else {
code = tsortNextTuple(pHandle, &pTupleHandle);
}
if (pTupleHandle == NULL || code != 0) {
lino = __LINE__;
break;
}
code = appendOneRowToDataBlock(p, pTupleHandle);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _error);
if (p->info.rows >= capacity) {
break;
}
}
if (TSDB_CODE_SUCCESS != code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _error);
if (p->info.rows > 0) {
code = blockDataEnsureCapacity(pDataBlock, capacity);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _error);
// todo extract function to handle this
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
if (pmInfo == NULL) {
return terrno;
}
QUERY_CHECK_NULL(pmInfo, code, lino, _error, terrno);
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
if (pSrc == NULL) {
return terrno;
}
QUERY_CHECK_NULL(pSrc, code, lino, _error, terrno);
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
if (pDst == NULL) {
return terrno;
}
QUERY_CHECK_NULL(pDst, code, lino, _error, terrno);
code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _error);
}
pDataBlock->info.dataLoad = 1;
@ -338,6 +327,12 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
blockDataDestroy(p);
*pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
return code;
_error:
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
blockDataDestroy(p);
return code;
}
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
@ -746,6 +741,7 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->matchInfo.pList, pInfo, &pBlock);
if (pBlock != NULL && (code == 0)) {