opt stream time slice

This commit is contained in:
54liuyao 2024-11-26 08:45:44 +08:00
parent f2f613a529
commit 721170dcb3
1 changed files with 9 additions and 4 deletions

View File

@ -748,7 +748,7 @@ _end:
static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint,
SSlicePoint* pNextPoint) {
SSlicePoint* pNextPoint, bool isFwc) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t tmpRes = TSDB_CODE_SUCCESS;
@ -769,6 +769,10 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS
setPointBuff(pCurPoint, pFillSup);
pFillSup->cur.key = pCurPoint->pRightRow->key;
pFillSup->cur.pRowVal = (SResultCellData*)pCurPoint->pRightRow->pRowVal;
if (isFwc) {
qDebug("===stream=== only get current point state");
goto _end;
}
} else {
pFillSup->cur.key = pCurPoint->key.ts + 1;
}
@ -1466,6 +1470,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup
return;
}
bool isFwc = (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE);
// clear the existed group id
pBlock->info.id.groupId = 0;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
@ -1497,7 +1502,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup
SSlicePoint prevPoint = {0};
SSlicePoint nextPoint = {0};
if (pFillSup->type != TSDB_FILL_LINEAR) {
code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, isFwc);
} else {
code =
getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
@ -1516,7 +1521,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup
}
}
if (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
if (isFwc) {
setForceWindowCloseFillRule(pFillSup, pFillInfo, pKey->ts);
} else {
setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts);
@ -1558,7 +1563,7 @@ static void doBuildTimeSliceDeleteResult(SStreamAggSupporter* pAggSup, SStreamFi
SSlicePoint nextPoint = {0};
STimeWindow tw = {0};
if (pFillSup->type != TSDB_FILL_LINEAR) {
code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, false);
} else {
code =
getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);