diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index b8c3ec90f9..5066107f3c 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -56,11 +56,12 @@ void destroyStreamCountAggOperatorInfo(void* param) { &pInfo->groupResInfo); pInfo->pOperator = NULL; } - destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupExprSupp(&pInfo->scalarSupp); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); pInfo->pUpdated = NULL; + destroyStreamAggSupporter(&pInfo->streamAggSup); colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index a2304a2e6c..a70e67ad04 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -53,10 +53,11 @@ void destroyStreamEventOperatorInfo(void* param) { &pInfo->groupResInfo); pInfo->pOperator = NULL; } - destroyStreamAggSupporter(&pInfo->streamAggSup); + clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); pInfo->pUpdated = NULL; + destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 44004a4c6b..7eadf3405a 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -151,7 +151,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) { pInfo->pOperator = NULL; } colDataDestroy(&pInfo->twAggSup.timeWindowData); - destroyStreamAggSupporter(&pInfo->streamAggSup); + resetPrevAndNextWindow(pInfo->pFillSup); destroyStreamFillSupporter(pInfo->pFillSup); destroyStreamFillInfo(pInfo->pFillInfo); @@ -179,6 +179,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) { taosArrayDestroy(pInfo->historyWins); taosArrayDestroy(pInfo->pCloseTs); + destroyStreamAggSupporter(&pInfo->streamAggSup); taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 3b6c1963dc..0c30266ba9 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -467,7 +467,7 @@ _end: void destroyFlusedPos(void* pRes) { SRowBuffPos* pPos = (SRowBuffPos*)pRes; - if (!pPos->needFree && !pPos->pRowBuff) { + if (pPos->needFree && !pPos->pRowBuff) { taosMemoryFreeClear(pPos->pKey); taosMemoryFree(pPos); } @@ -480,9 +480,11 @@ void destroyFlusedppPos(void* ppRes) { void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { int32_t size = taosArrayGetSize(pGroupResInfo->pRows); - for (int32_t i = pGroupResInfo->index; i < size; i++) { - void* pPos = taosArrayGetP(pGroupResInfo->pRows, i); - destroyFlusedPos(pPos); + if (pGroupResInfo->index >= 0 && pGroupResInfo->index < size) { + for (int32_t i = pGroupResInfo->index; i < size; i++) { + void* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + destroyFlusedPos(pPos); + } } pGroupResInfo->freeItem = false; taosArrayDestroy(pGroupResInfo->pRows); @@ -2141,11 +2143,12 @@ void destroyStreamSessionAggOperatorInfo(void* param) { &pInfo->groupResInfo); pInfo->pOperator = NULL; } - destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupExprSupp(&pInfo->scalarSupp); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); pInfo->pUpdated = NULL; + destroyStreamAggSupporter(&pInfo->streamAggSup); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -4251,10 +4254,11 @@ void destroyStreamStateOperatorInfo(void* param) { &pInfo->groupResInfo); pInfo->pOperator = NULL; } - destroyStreamAggSupporter(&pInfo->streamAggSup); + clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); pInfo->pUpdated = NULL; + destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index d6dfde1ee6..0255ac01b5 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -394,14 +394,17 @@ int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlus pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->stateBuffRemoveByPosFn(pFileState, pPos); - SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); - taosMemoryFreeClear(tmp); + if (pPos->beUsed == false) { + SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(tmp); + } if (pPos->pRowBuff) { i++; } } } } + qDebug("clear flushed row buff. %d rows to disk. is all:%d", listNEles(pFlushList), all); _end: if (code != TSDB_CODE_SUCCESS) { @@ -433,7 +436,6 @@ int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; if (pPos->beUsed == used) { if (used && !pPos->pRowBuff) { - QUERY_CHECK_CONDITION((pPos->needFree == true), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); continue; } code = tdListAppend(pFlushList, &pPos); @@ -441,8 +443,10 @@ int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->stateBuffRemoveByPosFn(pFileState, pPos); - SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); - taosMemoryFreeClear(tmp); + if (pPos->beUsed == false) { + SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(tmp); + } if (pPos->pRowBuff) { i++; } @@ -511,9 +515,12 @@ int32_t clearRowBuff(SStreamFileState* pFileState) { if (pFileState->deleteMark != INT64_MAX) { clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false); } - if (isListEmpty(pFileState->freeBuffs)) { - return flushRowBuff(pFileState); - } + do { + int32_t code = flushRowBuff(pFileState); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } while (isListEmpty(pFileState->freeBuffs) && pFileState->curRowCount == pFileState->maxRowCount); return TSDB_CODE_SUCCESS; } @@ -756,10 +763,10 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** QUERY_CHECK_CODE(code, lino, _end); (*pVal) = pPos->pRowBuff; - if (!pPos->needFree) { - code = tdListPrepend(pFileState->usedBuffs, &pPos); - QUERY_CHECK_CODE(code, lino, _end); - } + // if (!pPos->needFree) { + // code = tdListPrepend(pFileState->usedBuffs, &pPos); + // QUERY_CHECK_CODE(code, lino, _end); + // } _end: if (code != TSDB_CODE_SUCCESS) {