diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index ca39776d3d..57348f2902 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -4287,6 +4287,13 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { if (resBlock != NULL) { return resBlock; } + + if (pInfo->recvRetrive) { + pInfo->recvRetrive = false; + printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidRetriveRes; + } + if (pInfo->clearState) { pInfo->clearState = false; clearFunctionContext(&pOperator->exprSupp); @@ -4371,7 +4378,11 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild); taosArrayDestroy(delWins); - continue; + pInfo->recvRetrive = true; + copyDataBlock(pInfo->pMidRetriveRes, pBlock); + pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE; + pInfo->clearState = true; + break; } else { ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -4408,6 +4419,13 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { if (resBlock != NULL) { return resBlock; } + + if (pInfo->recvRetrive) { + pInfo->recvRetrive = false; + printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidRetriveRes; + } + if (pInfo->clearState) { pInfo->clearState = false; clearFunctionContext(&pOperator->exprSupp);