diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index d5d144ee65..c5f077d57f 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -636,6 +636,7 @@ typedef struct SStreamFillSupporter { SSHashObj* pResMap; bool hasDelete; SStorageAPI* pAPI; + STimeWindow winRange; } SStreamFillSupporter; typedef struct SStreamFillOperatorInfo { @@ -770,6 +771,7 @@ SSDataBlock* getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo); bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType); bool compareVal(const char* v, const SStateKeys* pKey); +bool inWinRange(STimeWindow* range, STimeWindow* cur); int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, TSKEY* primaryKeys, int32_t prevPosition, int32_t order); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 9fce058c4c..ecc2526dd9 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -950,7 +950,10 @@ static bool hasRemainCalc(SStreamFillInfo* pFillInfo) { static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) { while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { - buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock); + STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current}; + if (inWinRange(&pFillSup->winRange, &st)) { + buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock); + } pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillSup->interval.precision); } @@ -960,7 +963,8 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { uint64_t groupId = pBlock->info.id.groupId; SWinKey key = {.groupId = groupId, .ts = pFillInfo->current}; - if (pFillSup->hasDelete && !checkResult(pFillSup, pFillInfo->current, groupId)) { + STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current}; + if ( ( pFillSup->hasDelete && !checkResult(pFillSup, pFillInfo->current, groupId) ) || !inWinRange(&pFillSup->winRange, &st) ) { pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillInfo->pLinearInfo->winIndex++; @@ -1345,6 +1349,11 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { pInfo->pFillInfo->preRowKey = INT64_MIN; } + pInfo->pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow; + if (pInfo->pFillSup->winRange.ekey <= 0) { + pInfo->pFillSup->winRange.ekey = INT64_MAX; + } + switch (pBlock->info.type) { case STREAM_RETRIEVE: return pBlock; diff --git a/tests/script/tsim/stream/ignoreCheckUpdate.sim b/tests/script/tsim/stream/ignoreCheckUpdate.sim index 725251b081..108c845e4d 100644 --- a/tests/script/tsim/stream/ignoreCheckUpdate.sim +++ b/tests/script/tsim/stream/ignoreCheckUpdate.sim @@ -206,6 +206,8 @@ print create stream streams3 trigger at_once ignore update 1 into streamt3 as se sql create stream streams3 trigger at_once ignore update 1 into streamt3 as select _wstart c1, count(*) c2, max(b) c3 from st interval(10s); +sleep 2000 + sql insert into t1 values(1648791213000,1,1,1); sql insert into t1 values(1648791213000,2,2,2);