Merge pull request #25453 from taosdata/fix/TD-29701

fix(stream):mem leak
This commit is contained in:
Haojun Liao 2024-04-23 19:50:07 +08:00 committed by GitHub
commit e5c679d280
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 23 additions and 10 deletions

View File

@ -944,6 +944,7 @@ int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
void destroyFlusedPos(void* pRes);
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key);

View File

@ -50,12 +50,13 @@ void destroyStreamCountAggOperatorInfo(void* param) {
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);
tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);

View File

@ -46,6 +46,9 @@ void destroyStreamEventOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
cleanupExprSupp(&pInfo->scalarSupp);
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -60,7 +63,6 @@ void destroyStreamEventOperatorInfo(void* param) {
tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pAllUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);

View File

@ -388,15 +388,20 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
}
}
void destroyFlusedPos(void* pRes) {
SRowBuffPos* pPos = (SRowBuffPos*) pRes;
if (!pPos->needFree && !pPos->pRowBuff) {
taosMemoryFreeClear(pPos->pKey);
taosMemoryFree(pPos);
}
}
void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
if (pGroupResInfo->freeItem) {
int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
for (int32_t i = pGroupResInfo->index; i < size; i++) {
SRowBuffPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
if (!pPos->needFree && !pPos->pRowBuff) {
taosMemoryFreeClear(pPos->pKey);
taosMemoryFree(pPos);
}
void* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
destroyFlusedPos(pPos);
}
pGroupResInfo->freeItem = false;
}
@ -409,6 +414,8 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
// it should be empty.
void* pIte = NULL;
@ -437,7 +444,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
cleanupExprSupp(&pInfo->scalarSupp);
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pDeletedMap);
blockDataDestroy(pInfo->pCheckpointRes);
@ -1664,6 +1670,8 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -1679,7 +1687,6 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
blockDataDestroy(pInfo->pWinBlock);
tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);
@ -3250,6 +3257,9 @@ void destroyStreamStateOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
cleanupExprSupp(&pInfo->scalarSupp);
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -3263,7 +3273,6 @@ void destroyStreamStateOperatorInfo(void* param) {
blockDataDestroy(pInfo->pDelRes);
tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);