From b66260d4e9d0db14966bede2d26202d0bf2fcce6 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Feb 2024 13:29:29 +0800 Subject: [PATCH] mid pull over --- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/streamtimewindowoperator.c | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index a9ee304656..f7ed9805e8 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -561,6 +561,7 @@ typedef struct SStreamIntervalOperatorInfo { SSDataBlock* pMidRetriveRes; bool recvPullover; SSDataBlock* pMidPulloverRes; + bool clearState; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 660b181b0e..954a9659b4 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1566,6 +1566,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->recvRetrive = false; pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); + pInfo->clearState = false; pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -4287,6 +4288,11 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } + if (pInfo->clearState) { + pInfo->clearState = false; + clearFunctionContext(&pOperator->exprSupp); + clearStreamIntervalOperator(pInfo); + } } if (!pInfo->pUpdated) { @@ -4341,6 +4347,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } else { pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; } + pInfo->clearState = true; return pInfo->pDelRes; } continue; @@ -4351,6 +4358,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { pInfo->pPullWins, pInfo->numOfChild, pOperator); if (pInfo->recvPullover) { copyDataBlock(pInfo->pMidPulloverRes, pBlock); + pInfo->clearState = true; break; } continue;