diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index 5fd75f9b99..78b3cd2f40 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -112,6 +112,7 @@ typedef struct SStreamFillInfo { int32_t pos; SArray* delRanges; int32_t delIndex; + uint64_t curGroupId; } SStreamFillInfo; int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 92782f67fd..98beb30c30 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -520,7 +520,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, pFillSup->cur.pRowVal = curVal; SStreamStateCur* pCur = streamStateFillSeekKeyPrev(pState, &key); - SWinKey preKey = {.groupId = groupId}; + SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId}; void* preVal = NULL; int32_t preVLen = 0; code = streamStateGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen); @@ -542,7 +542,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, pCur = streamStateFillSeekKeyNext(pState, &key); } - SWinKey nextKey = {.groupId = groupId}; + SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId}; void* nextVal = NULL; int32_t nextVLen = 0; code = streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen); @@ -1242,6 +1242,11 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { } printDataBlock(pBlock, "stream fill recv"); + if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) { + pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId; + pInfo->pFillInfo->preRowKey = INT64_MIN; + } + switch (pBlock->info.type) { case STREAM_RETRIEVE: return pBlock; @@ -1392,6 +1397,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pFillInfo->type = pFillSup->type; pFillInfo->delRanges = taosArrayInit(16, sizeof(STimeRange)); pFillInfo->delIndex = 0; + pFillInfo->curGroupId = 0; return pFillInfo; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 2d11410818..73277bf5ae 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2854,7 +2854,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->pState = pAggSup->pState; - if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { + if ( (!pScanInfo->igCheckUpdate || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) && !pScanInfo->pUpdateInfo ) { pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); } pScanInfo->twAggSup = *pTwSup; diff --git a/tests/script/tsim/stream/state1.sim b/tests/script/tsim/stream/state1.sim index f2fd2beb6f..775528bd4e 100644 --- a/tests/script/tsim/stream/state1.sim +++ b/tests/script/tsim/stream/state1.sim @@ -102,8 +102,8 @@ print step 2 sql create database test2 vgroups 1; sql use test2; sql create table t1(ts timestamp, a int, b int , c int, d double); -print create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a) -sql create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a); +print create stream streams2 trigger at_once watermark 1000s IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a) +sql create stream streams2 trigger at_once watermark 1000s IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a); sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791213010,1,2,3,1.1);