fix(stream): fix mem leak && rebuild event window when the window end flag is modified

This commit is contained in:
54liuyao 2025-02-27 11:23:40 +08:00
parent 58b4c6b47e
commit 9d3a00920e
2 changed files with 10 additions and 7 deletions

View File

@ -234,6 +234,11 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW
pWin->ekey = pTsData[i]; pWin->ekey = pTsData[i];
pWinInfo->pWinFlag->endFlag = ends[i]; pWinInfo->pWinFlag->endFlag = ends[i];
} else if (pWin->ekey == pTsData[i]) { } else if (pWin->ekey == pTsData[i]) {
if (pWinInfo->pWinFlag->endFlag == true && ends[i] == false) {
(*pWinRow) = i + 1 - start;
*pRebuild = true;
goto _end;
}
pWinInfo->pWinFlag->endFlag |= ends[i]; pWinInfo->pWinFlag->endFlag |= ends[i];
} else if (ends[i] && !pWinInfo->pWinFlag->endFlag) { } else if (ends[i] && !pWinInfo->pWinFlag->endFlag) {
*pRebuild = true; *pRebuild = true;

View File

@ -479,14 +479,12 @@ void destroyFlusedppPos(void* ppRes) {
} }
void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
if (pGroupResInfo->freeItem) { int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
int32_t size = taosArrayGetSize(pGroupResInfo->pRows); for (int32_t i = pGroupResInfo->index; i < size; i++) {
for (int32_t i = pGroupResInfo->index; i < size; i++) { void* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
void* pPos = taosArrayGetP(pGroupResInfo->pRows, i); destroyFlusedPos(pPos);
destroyFlusedPos(pPos);
}
pGroupResInfo->freeItem = false;
} }
pGroupResInfo->freeItem = false;
taosArrayDestroy(pGroupResInfo->pRows); taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL; pGroupResInfo->pRows = NULL;
pGroupResInfo->index = 0; pGroupResInfo->index = 0;