From 6f2a78a4bbd52de74d79b629a75924eadb8a8cb7 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 19 Aug 2024 13:23:09 +0800 Subject: [PATCH] fix issue --- .../executor/src/streamtimesliceoperator.c | 67 +++++++++++++++++-- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index b182752d33..0607250ca4 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -924,6 +924,60 @@ static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFi } } +static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { + TSKEY prevWKey = INT64_MIN; + TSKEY nextWKey = INT64_MIN; + if (hasPrevWindow(pFillSup)) { + prevWKey = pFillSup->prev.key; + } + if (hasNextWindow(pFillSup)) { + nextWKey = pFillSup->next.key; + } + TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + + pFillInfo->needFill = true; + pFillInfo->pos = FILL_POS_INVALID; + switch (pFillInfo->type) { + case TSDB_FILL_NULL: + case TSDB_FILL_NULL_F: + case TSDB_FILL_SET_VALUE: + case TSDB_FILL_SET_VALUE_F: { + if (ts != pFillSup->cur.key) { + pFillInfo->pos = FILL_POS_INVALID; + setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); + } else { + pFillInfo->needFill = false; + pFillInfo->pos = FILL_POS_START; + goto _end; + } + copyNonFillValueInfo(pFillSup, pFillInfo); + } break; + case TSDB_FILL_PREV: { + if (ts != pFillSup->cur.key) { + pFillInfo->pos = FILL_POS_INVALID; + setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); + } else if (hasPrevWindow(pFillSup)) { + pFillInfo->pos = FILL_POS_INVALID; + setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); + } else { + pFillInfo->needFill = false; + pFillInfo->pos = FILL_POS_START; + goto _end; + } + pFillInfo->pResRow = &pFillSup->prev; + } break; + default: + ASSERT(0); + break; + } + +_end: + if (ts != pFillSup->cur.key) { + pFillInfo->pos = FILL_POS_INVALID; + } +} + static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { if (!hasNextWindow(pFillSup) && !hasPrevWindow(pFillSup)) { pFillInfo->needFill = false; @@ -957,7 +1011,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pos = FILL_POS_START; } copyNonFillValueInfo(pFillSup, pFillInfo); - pFillInfo->pResRow->key = ts; } break; case TSDB_FILL_PREV: { if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey != pFillInfo->prePointKey && @@ -1263,7 +1316,7 @@ void getPrevResKey(int64_t curGroupId, SArray* pKeyArray, int32_t curIndex, TSKE *pNextKey = INT64_MIN; } -void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, +void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwSup, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1304,8 +1357,12 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor pFillInfo->nextPointKey = nextPoint.key.ts; } } - - setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts); + + if (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + setForceWindowCloseFillRule(pFillSup, pFillInfo, pKey->ts); + } else { + setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts); + } doStreamFillRange(pFillSup, pFillInfo, pBlock); releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore); @@ -1330,7 +1387,7 @@ static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRe uint16_t opType = pOperator->operatorType; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; - doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo); + doBuildTimeSlicePointResult(pAggSup, &pInfo->twAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo); if (pInfo->pRes->info.rows != 0) { printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); (*ppRes) = pInfo->pRes;