diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 1533527834..ca3165bf92 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2565,7 +2565,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); if (IS_FINAL_OP(pInfo)) { addRetriveWindow(delWins, pInfo); - taosArrayAddAll(pInfo->pDelWins, delWins); + if (pBlock->info.type != STREAM_CLEAR) { + taosArrayAddAll(pInfo->pDelWins, delWins); + } taosArrayDestroy(delWins); continue; } @@ -2577,6 +2579,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + if (pBlock->info.type == STREAM_CLEAR) { + pInfo->pDelRes->info.type = STREAM_CLEAR; + } else { + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; + } return pInfo->pDelRes; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 78a1f3e967..bd06f96798 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -274,7 +274,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("===stream===try agian batchSize:%d", batchSize); continue; } - + qDebug("===stream===break batchSize:%d", batchSize); break; }