From 721170dcb30b63d698b2b90ca4d1ac3e8fdabdb2 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 26 Nov 2024 08:45:44 +0800 Subject: [PATCH] opt stream time slice --- source/libs/executor/src/streamtimesliceoperator.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index efc5dd6d6a..d307c651fc 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -748,7 +748,7 @@ _end: static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, - SSlicePoint* pNextPoint) { + SSlicePoint* pNextPoint, bool isFwc) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t tmpRes = TSDB_CODE_SUCCESS; @@ -769,6 +769,10 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS setPointBuff(pCurPoint, pFillSup); pFillSup->cur.key = pCurPoint->pRightRow->key; pFillSup->cur.pRowVal = (SResultCellData*)pCurPoint->pRightRow->pRowVal; + if (isFwc) { + qDebug("===stream=== only get current point state"); + goto _end; + } } else { pFillSup->cur.key = pCurPoint->key.ts + 1; } @@ -1466,6 +1470,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup return; } + bool isFwc = (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE); // clear the existed group id pBlock->info.id.groupId = 0; int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); @@ -1497,7 +1502,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup SSlicePoint prevPoint = {0}; SSlicePoint nextPoint = {0}; if (pFillSup->type != TSDB_FILL_LINEAR) { - code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); + code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, isFwc); } else { code = getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); @@ -1516,7 +1521,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup } } - if (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + if (isFwc) { setForceWindowCloseFillRule(pFillSup, pFillInfo, pKey->ts); } else { setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts); @@ -1558,7 +1563,7 @@ static void doBuildTimeSliceDeleteResult(SStreamAggSupporter* pAggSup, SStreamFi SSlicePoint nextPoint = {0}; STimeWindow tw = {0}; if (pFillSup->type != TSDB_FILL_LINEAR) { - code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); + code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, false); } else { code = getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);