diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2534c5e9f0..831fd4e883 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1009,6 +1009,22 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { pSup->deleteMark = INT64_MAX; pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; pInfo->ignoreExpiredData = false; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) { + SStreamCountAggOperatorInfo* pInfo = pOperator->info; + STimeWindowAggSupp* pSup = &pInfo->twAggSup; + + ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); + + qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + + pSup->calTriggerSaved = pSup->calTrigger; + pSup->deleteMarkSaved = pSup->deleteMark; + pSup->calTrigger = STREAM_TRIGGER_AT_ONCE; + pSup->deleteMark = INT64_MAX; + pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; + pInfo->ignoreExpiredData = false; + qInfo("save stream task:%s, param for state: %d", GET_TASKID(pTaskInfo), pInfo->ignoreExpiredData); } // iterate operator tree