fix(stream): fix stream mem leak

This commit is contained in:
54liuyao 2025-03-10 16:28:28 +08:00
parent 38cbb5e08e
commit 051c34e2f1
5 changed files with 35 additions and 21 deletions

View File

@ -57,11 +57,12 @@ void destroyStreamCountAggOperatorInfo(void* param) {
}
destroyStreamBasicInfo(&pInfo->basic);
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);

View File

@ -56,10 +56,11 @@ void destroyStreamEventOperatorInfo(void* param) {
}
destroyStreamBasicInfo(&pInfo->basic);
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
if (pInfo->pChildren != NULL) {

View File

@ -151,7 +151,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) {
pInfo->pOperator = NULL;
}
colDataDestroy(&pInfo->twAggSup.timeWindowData);
destroyStreamAggSupporter(&pInfo->streamAggSup);
resetPrevAndNextWindow(pInfo->pFillSup);
destroyStreamFillSupporter(pInfo->pFillSup);
destroyStreamFillInfo(pInfo->pFillInfo);
@ -179,6 +179,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) {
taosArrayDestroy(pInfo->historyWins);
taosArrayDestroy(pInfo->pCloseTs);
destroyStreamAggSupporter(&pInfo->streamAggSup);
taosMemoryFreeClear(param);
}

View File

@ -462,7 +462,7 @@ _end:
void destroyFlusedPos(void* pRes) {
SRowBuffPos* pPos = (SRowBuffPos*)pRes;
if (!pPos->needFree && !pPos->pRowBuff) {
if (pPos->needFree && !pPos->pRowBuff) {
taosMemoryFreeClear(pPos->pKey);
taosMemoryFree(pPos);
}
@ -475,10 +475,12 @@ void destroyFlusedppPos(void* ppRes) {
void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
if (pGroupResInfo->index >= 0 && pGroupResInfo->index < size) {
for (int32_t i = pGroupResInfo->index; i < size; i++) {
void* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
destroyFlusedPos(pPos);
}
}
pGroupResInfo->freeItem = false;
taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL;
@ -2204,11 +2206,12 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
}
destroyStreamBasicInfo(&pInfo->basic);
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -4442,10 +4445,11 @@ void destroyStreamStateOperatorInfo(void* param) {
}
destroyStreamBasicInfo(&pInfo->basic);
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
if (pInfo->pChildren != NULL) {

View File

@ -394,14 +394,17 @@ int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlus
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
if (pPos->beUsed == false) {
SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(tmp);
}
if (pPos->pRowBuff) {
i++;
}
}
}
}
qDebug("clear flushed row buff. %d rows to disk. is all:%d", listNEles(pFlushList), all);
_end:
if (code != TSDB_CODE_SUCCESS) {
@ -433,7 +436,6 @@ int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList,
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (pPos->beUsed == used) {
if (used && !pPos->pRowBuff) {
QUERY_CHECK_CONDITION((pPos->needFree == true), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
continue;
}
code = tdListAppend(pFlushList, &pPos);
@ -441,8 +443,10 @@ int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList,
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
if (pPos->beUsed == false) {
SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(tmp);
}
if (pPos->pRowBuff) {
i++;
}
@ -511,9 +515,12 @@ int32_t clearRowBuff(SStreamFileState* pFileState) {
if (pFileState->deleteMark != INT64_MAX) {
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
}
if (isListEmpty(pFileState->freeBuffs)) {
return flushRowBuff(pFileState);
do {
int32_t code = flushRowBuff(pFileState);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} while (isListEmpty(pFileState->freeBuffs) && pFileState->curRowCount == pFileState->maxRowCount);
return TSDB_CODE_SUCCESS;
}
@ -756,10 +763,10 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
QUERY_CHECK_CODE(code, lino, _end);
(*pVal) = pPos->pRowBuff;
if (!pPos->needFree) {
code = tdListPrepend(pFileState->usedBuffs, &pPos);
QUERY_CHECK_CODE(code, lino, _end);
}
// if (!pPos->needFree) {
// code = tdListPrepend(pFileState->usedBuffs, &pPos);
// QUERY_CHECK_CODE(code, lino, _end);
// }
_end:
if (code != TSDB_CODE_SUCCESS) {