diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 842098d9ff..f90cc6699b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1435,7 +1435,12 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock dumyInfo.cur.pageId = -1; bool isClosed = false; STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; - if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) { + bool overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup); + if (pInfo->igExpired && overDue) { + continue; + } + + if (tableInserted && overDue) { win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); isClosed = isCloseWindow(&win, &pInfo->twAggSup); } @@ -1701,41 +1706,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SStreamScanInfo* pInfo = pOperator->info; qDebug("stream scan called"); -#if 0 - SStreamState* pState = pTaskInfo->streamInfo.pState; - if (pState) { - printf(">>>>>>>> stream write backend\n"); - SWinKey key = { - .ts = 1, - .groupId = 2, - }; - char tmp[100] = "abcdefg1"; - if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) { - ASSERT(0); - } - - key.ts = 2; - char tmp2[100] = "abcdefg2"; - if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) { - ASSERT(0); - } - - key.groupId = 5; - key.ts = 1; - char tmp3[100] = "abcdefg3"; - if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) { - ASSERT(0); - } - - char* val2 = NULL; - int32_t sz; - if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) { - ASSERT(0); - } - printf("stream read %s %d\n", val2, sz); - streamFreeVal(val2); - } -#endif if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { @@ -2370,6 +2340,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->partitionSup.needCalc = false; pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igExpired = pTableScanNode->igExpired; + pInfo->twAggSup.maxTs = INT64_MIN; setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index a384babb94..bfde50761a 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -343,6 +343,13 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pScan->node.groupAction = GROUP_ACTION_NONE; pScan->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK; + if (pCxt->pPlanCxt->streamQuery) { + pScan->triggerType = pCxt->pPlanCxt->triggerType; + pScan->watermark = pCxt->pPlanCxt->watermark; + pScan->deleteMark = pCxt->pPlanCxt->deleteMark; + pScan->igExpired = pCxt->pPlanCxt->igExpired; + pScan->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate; + } // set columns to scan if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index ff59087e02..90a7261074 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -328,11 +328,6 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) { pScan->sliding = ((SWindowLogicNode*)pParent)->sliding; pScan->intervalUnit = ((SWindowLogicNode*)pParent)->intervalUnit; pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit; - pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType; - pScan->watermark = ((SWindowLogicNode*)pParent)->watermark; - pScan->deleteMark = ((SWindowLogicNode*)pParent)->deleteMark; - pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired; - pScan->igCheckUpdate = ((SWindowLogicNode*)pParent)->igCheckUpdate; } } diff --git a/tests/script/tsim/stream/ignoreExpiredData.sim b/tests/script/tsim/stream/ignoreExpiredData.sim index 03f574bc52..b143b7977f 100644 --- a/tests/script/tsim/stream/ignoreExpiredData.sim +++ b/tests/script/tsim/stream/ignoreExpiredData.sim @@ -52,6 +52,7 @@ sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791223001,1,2,3,1.1); sql insert into t1 values(1648791233002,2,2,3,2.1); sql insert into t1 values(1648791243003,2,2,3,3.1); +sleep 300 sql insert into t1 values(1648791200000,4,2,3,4.1); $loop_count = 0 @@ -115,6 +116,7 @@ sql create stream stream_t1 trigger at_once IGNORE EXPIRED 1 into streamtST1 as sql create stream stream_t2 trigger at_once IGNORE EXPIRED 1 into streamtST2 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ; sql insert into ts1 values(1648791211000,1,2,3); sql insert into ts1 values(1648791222001,2,2,3); +sleep 300 sql insert into ts2 values(1648791211000,1,2,3); sql insert into ts2 values(1648791222001,2,2,3);