diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 474ac5f98d..d0f85b71ac 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2819,10 +2819,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doStreamIntervalSaveCheckpoint(pOperator); pAPI->stateStore.streamStateCommit(pInfo->pState); copyDataBlock(pInfo->pCheckpointRes, pBlock); - pOperator->status = OP_RES_TO_RETURN; - qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, - IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack); - pInfo->numOfDatapack = 0; continue; } else { ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); @@ -4248,7 +4244,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CHECKPOINT) { doStreamSessionSaveCheckpoint(pOperator); pAggSup->stateStore.streamStateCommit(pAggSup->pState); - pOperator->status = OP_RES_TO_RETURN; continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); @@ -5539,8 +5534,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { pAPI->stateStore.streamStateCommit(pInfo->pState); pInfo->reCkBlock = true; copyDataBlock(pInfo->pCheckpointRes, pBlock); - qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack); - pInfo->numOfDatapack = 0; continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");