diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index efc5dd6d6a..d307c651fc 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -748,7 +748,7 @@ _end: static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, - SSlicePoint* pNextPoint) { + SSlicePoint* pNextPoint, bool isFwc) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t tmpRes = TSDB_CODE_SUCCESS; @@ -769,6 +769,10 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS setPointBuff(pCurPoint, pFillSup); pFillSup->cur.key = pCurPoint->pRightRow->key; pFillSup->cur.pRowVal = (SResultCellData*)pCurPoint->pRightRow->pRowVal; + if (isFwc) { + qDebug("===stream=== only get current point state"); + goto _end; + } } else { pFillSup->cur.key = pCurPoint->key.ts + 1; } @@ -1466,6 +1470,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup return; } + bool isFwc = (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE); // clear the existed group id pBlock->info.id.groupId = 0; int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); @@ -1497,7 +1502,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup SSlicePoint prevPoint = {0}; SSlicePoint nextPoint = {0}; if (pFillSup->type != TSDB_FILL_LINEAR) { - code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); + code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, isFwc); } else { code = getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); @@ -1516,7 +1521,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup } } - if (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + if (isFwc) { setForceWindowCloseFillRule(pFillSup, pFillInfo, pKey->ts); } else { setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts); @@ -1558,7 +1563,7 @@ static void doBuildTimeSliceDeleteResult(SStreamAggSupporter* pAggSup, SStreamFi SSlicePoint nextPoint = {0}; STimeWindow tw = {0}; if (pFillSup->type != TSDB_FILL_LINEAR) { - code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); + code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, false); } else { code = getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);