diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index a9ee304656..f7ed9805e8 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -561,6 +561,7 @@ typedef struct SStreamIntervalOperatorInfo { SSDataBlock* pMidRetriveRes; bool recvPullover; SSDataBlock* pMidPulloverRes; + bool clearState; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 660b181b0e..954a9659b4 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1566,6 +1566,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->recvRetrive = false; pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); + pInfo->clearState = false; pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -4287,6 +4288,11 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } + if (pInfo->clearState) { + pInfo->clearState = false; + clearFunctionContext(&pOperator->exprSupp); + clearStreamIntervalOperator(pInfo); + } } if (!pInfo->pUpdated) { @@ -4341,6 +4347,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } else { pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; } + pInfo->clearState = true; return pInfo->pDelRes; } continue; @@ -4351,6 +4358,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { pInfo->pPullWins, pInfo->numOfChild, pOperator); if (pInfo->recvPullover) { copyDataBlock(pInfo->pMidPulloverRes, pBlock); + pInfo->clearState = true; break; } continue;