Merge pull request #27564 from taosdata/fix/3_liaohj
fix(query): fix memory leak.
This commit is contained in:
commit
912d987184
|
@ -254,17 +254,17 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
|
||||||
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
|
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
|
||||||
QRY_OPTR_CHECK(pResBlock);
|
QRY_OPTR_CHECK(pResBlock);
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
int32_t lino = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
SSDataBlock* p = NULL;
|
SSDataBlock* p = NULL;
|
||||||
int32_t code = tsortGetSortedDataBlock(pHandle, &p);
|
code = tsortGetSortedDataBlock(pHandle, &p);
|
||||||
if (p == NULL || (code != 0)) {
|
if (p == NULL || (code != 0)) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = blockDataEnsureCapacity(p, capacity);
|
code = blockDataEnsureCapacity(p, capacity);
|
||||||
if (code) {
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
STupleHandle* pTupleHandle;
|
STupleHandle* pTupleHandle;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -273,51 +273,40 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
|
||||||
} else {
|
} else {
|
||||||
code = tsortNextTuple(pHandle, &pTupleHandle);
|
code = tsortNextTuple(pHandle, &pTupleHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTupleHandle == NULL || code != 0) {
|
if (pTupleHandle == NULL || code != 0) {
|
||||||
|
lino = __LINE__;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = appendOneRowToDataBlock(p, pTupleHandle);
|
code = appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
if (code) {
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (p->info.rows >= capacity) {
|
if (p->info.rows >= capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
return code;
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
|
||||||
|
|
||||||
if (p->info.rows > 0) {
|
if (p->info.rows > 0) {
|
||||||
code = blockDataEnsureCapacity(pDataBlock, capacity);
|
code = blockDataEnsureCapacity(pDataBlock, capacity);
|
||||||
if (code) {
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo extract function to handle this
|
// todo extract function to handle this
|
||||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
|
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
if (pmInfo == NULL) {
|
QUERY_CHECK_NULL(pmInfo, code, lino, _error, terrno);
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||||
if (pSrc == NULL) {
|
QUERY_CHECK_NULL(pSrc, code, lino, _error, terrno);
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
||||||
if (pDst == NULL) {
|
QUERY_CHECK_NULL(pDst, code, lino, _error, terrno);
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
|
code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
|
||||||
if (code) {
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pDataBlock->info.dataLoad = 1;
|
pDataBlock->info.dataLoad = 1;
|
||||||
|
@ -329,6 +318,12 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
|
||||||
blockDataDestroy(p);
|
blockDataDestroy(p);
|
||||||
*pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
*pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
|
||||||
|
blockDataDestroy(p);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
||||||
|
@ -384,13 +379,13 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
code = tsortOpen(pInfo->pSortHandle);
|
code = tsortOpen(pInfo->pSortHandle);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
pTaskInfo->code = code;
|
||||||
|
} else {
|
||||||
|
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
|
||||||
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
OPTR_SET_OPENED(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
|
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
|
||||||
|
|
||||||
OPTR_SET_OPENED(pOperator);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -735,6 +730,7 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
||||||
pInfo->matchInfo.pList, pInfo, &pBlock);
|
pInfo->matchInfo.pList, pInfo, &pBlock);
|
||||||
if (pBlock != NULL && (code == 0)) {
|
if (pBlock != NULL && (code == 0)) {
|
||||||
|
|
|
@ -286,23 +286,19 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize,
|
||||||
SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
|
SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
|
||||||
uint32_t pqSortBufSize, SSortHandle** pHandle) {
|
uint32_t pqSortBufSize, SSortHandle** pHandle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
*pHandle = NULL;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
QRY_OPTR_CHECK(pHandle);
|
||||||
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
|
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
|
||||||
if (pSortHandle == NULL) {
|
QUERY_CHECK_NULL(pSortHandle, code, lino, _err, terrno);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSortHandle->type = type;
|
pSortHandle->type = type;
|
||||||
pSortHandle->pageSize = pageSize;
|
pSortHandle->pageSize = pageSize;
|
||||||
pSortHandle->numOfPages = numOfPages;
|
pSortHandle->numOfPages = numOfPages;
|
||||||
pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL);
|
pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL);
|
||||||
if (pSortHandle->pSortInfo == NULL) {
|
QUERY_CHECK_NULL(pSortHandle->pSortInfo, code, lino, _err, terrno);
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSortHandle->loops = 0;
|
pSortHandle->loops = 0;
|
||||||
|
|
||||||
pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
|
pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
|
||||||
if (pqMaxRows != 0) {
|
if (pqMaxRows != 0) {
|
||||||
pSortHandle->pqSortBufSize = pqSortBufSize;
|
pSortHandle->pqSortBufSize = pqSortBufSize;
|
||||||
|
@ -312,18 +308,13 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize,
|
||||||
pSortHandle->forceUsePQSort = false;
|
pSortHandle->forceUsePQSort = false;
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
code = createOneDataBlock(pBlock, false, &pSortHandle->pDataBlock);
|
code = createOneDataBlock(pBlock, false, &pSortHandle->pDataBlock);
|
||||||
if (code) {
|
QUERY_CHECK_CODE(code, lino, _err);
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pSortHandle->mergeLimit = -1;
|
pSortHandle->mergeLimit = -1;
|
||||||
|
|
||||||
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
|
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
|
||||||
if (pSortHandle->pOrderedSource == NULL) {
|
QUERY_CHECK_NULL(pSortHandle->pOrderedSource, code, lino, _err, terrno);
|
||||||
code = terrno;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSortHandle->cmpParam.orderInfo = pSortInfo;
|
pSortHandle->cmpParam.orderInfo = pSortInfo;
|
||||||
pSortHandle->cmpParam.cmpGroupId = false;
|
pSortHandle->cmpParam.cmpGroupId = false;
|
||||||
|
@ -346,17 +337,17 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize,
|
||||||
|
|
||||||
if (idstr != NULL) {
|
if (idstr != NULL) {
|
||||||
pSortHandle->idStr = taosStrdup(idstr);
|
pSortHandle->idStr = taosStrdup(idstr);
|
||||||
if (pSortHandle->idStr == NULL) {
|
QUERY_CHECK_NULL(pSortHandle->idStr, code, lino, _err, terrno);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
*pHandle = pSortHandle;
|
*pHandle = pSortHandle;
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsortDestroySortHandle(pSortHandle);
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
if (pSortHandle) {
|
||||||
|
tsortDestroySortHandle(pSortHandle);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue