From 6e19f452ce74fdafdd123eb957a98c4b76eb76cb Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Feb 2024 13:55:01 +0800 Subject: [PATCH] mid pull over --- .../executor/src/streamtimewindowoperator.c | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 954a9659b4..ca39776d3d 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -4283,10 +4283,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType)); return NULL; } else { - doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); - if (pInfo->pDelRes->info.rows != 0) { - printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + SSDataBlock* resBlock = buildIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; } if (pInfo->clearState) { pInfo->clearState = false; @@ -4347,7 +4346,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } else { pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; } - pInfo->clearState = true; + ASSERT(taosArrayGetSize(pInfo->pUpdated) == 0); return pInfo->pDelRes; } continue; @@ -4405,7 +4404,16 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - return buildIntervalResult(pOperator); + SSDataBlock* resBlock = buildIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; + } + if (pInfo->clearState) { + pInfo->clearState = false; + clearFunctionContext(&pOperator->exprSupp); + clearStreamIntervalOperator(pInfo); + } + return NULL; } void setStreamOperatorCompleted(SOperatorInfo* pOperator) {